From dbee7db6a091c6d3e43c0626bca1232de2618f53 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Wed, 4 Feb 2026 19:34:47 +0530 Subject: [PATCH 1/8] PHOENIX-7759: Initial cursor changes --- .../phoenix/exception/SQLExceptionCode.java | 3 + .../apache/phoenix/execute/MutationState.java | 62 +++- .../jdbc/MutationLimitBatchException.java | 57 ++++ .../apache/phoenix/jdbc/PhoenixStatement.java | 24 ++ .../apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 1 + .../schema/MutationLimitReachedException.java | 39 +++ .../phoenix/end2end/MutationStateIT.java | 308 ++++++++++++++++++ 8 files changed, 486 insertions(+), 10 deletions(-) create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java create mode 100644 phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 5d9fde16597..3eddf2278fe 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -740,6 +740,9 @@ public SQLException newException(SQLExceptionInfo info) { info.getMaxPhoenixColumnSizeBytes(), info.getPhoenixColumnSizeBytes()); } }), + MUTATION_LIMIT_REACHED(733, "LIM04", + "Mutation buffer limit reached. Existing mutations are preserved. " + + "Commit current mutations and retry the failed operation."), INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."), HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index fb82692be53..69653170c7f 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -31,10 +31,12 @@ import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER; import static org.apache.phoenix.query.QueryServices.INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB; import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB; import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB; import static org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB; import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull; @@ -98,6 +100,7 @@ import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; import org.apache.phoenix.schema.MaxMutationSizeExceededException; +import org.apache.phoenix.schema.MutationLimitReachedException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PIndexState; @@ -187,6 +190,7 @@ public class MutationState implements SQLCloseable { private final boolean indexRegionObserverEnabledAllTables; private final boolean serverSideImmutableIndexes; + private final boolean preserveOnLimitExceeded; /** * Return result back to client. To be used when client needs to read the whole row or some @@ -268,6 +272,9 @@ Maps.> newHashMapWithExpectedSize(5), subT this.serverSideImmutableIndexes = this.connection.getQueryServices().getConfiguration() .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED); + this.preserveOnLimitExceeded = this.connection.getQueryServices().getProps() + .getBoolean(PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, + DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED); } public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, @@ -495,15 +502,23 @@ public static MutationState emptyMutationState(int maxSize, long maxSizeBytes, } private void throwIfTooBig() throws SQLException { - if (numRows > maxSize) { - int mutationSize = numRows; - resetState(); - throw new MaxMutationSizeExceededException(maxSize, mutationSize); - } - if (estimatedSize > maxSizeBytes) { - long mutationSizeByte = estimatedSize; - resetState(); - throw new MaxMutationSizeBytesExceededException(maxSizeBytes, mutationSizeByte); + if (numRows > maxSize || estimatedSize > maxSizeBytes) { + if (preserveOnLimitExceeded) { + // Preserve mode: throw new exception WITHOUT resetting state + throw new MutationLimitReachedException(); + } else { + // Legacy mode: reset state and throw old exceptions + if (numRows > maxSize) { + int mutationSize = numRows; + resetState(); + throw new MaxMutationSizeExceededException(maxSize, mutationSize); + } + if (estimatedSize > maxSizeBytes) { + long mutationSizeByte = estimatedSize; + resetState(); + throw new MaxMutationSizeBytesExceededException(maxSizeBytes, mutationSizeByte); + } + } } } @@ -615,6 +630,22 @@ private void joinMutationState(Map> srcMut } } + /** + * Pre-checks if joining the given MutationState would exceed configured limits. + * Throws MutationLimitReachedException if limits would be exceeded. + * Does NOT modify any state - this is a read-only check. + * + * @param newMutationState the state that would be joined + * @throws MutationLimitReachedException if joining would exceed limits + */ + private void checkLimitBeforeJoin(MutationState newMutationState) + throws MutationLimitReachedException { + if (this.numRows + newMutationState.numRows > maxSize || + this.estimatedSize + newMutationState.estimatedSize > maxSizeBytes) { + throw new MutationLimitReachedException(); + } + } + /** * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take * precedence. Combine any metrics collected for the newer mutation. the newer mutation state @@ -624,6 +655,12 @@ public void join(MutationState newMutationState) throws SQLException { return; } + // Pre-check: if preserveOnLimitExceeded is enabled, check before joining + // to avoid merging mutations that would exceed the limit + if (preserveOnLimitExceeded) { + checkLimitBeforeJoin(newMutationState); + } + phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; @@ -640,7 +677,12 @@ public void join(MutationState newMutationState) throws SQLException { } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) { readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue); } - throwIfTooBig(); + + // Post-check: only run in legacy mode (when preserveOnLimitExceeded is false) + // In preserve mode, we trust the pre-check and skip this + if (!preserveOnLimitExceeded) { + throwIfTooBig(); + } } private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java new file mode 100644 index 00000000000..cf1ff620037 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import java.sql.BatchUpdateException; +import org.apache.phoenix.schema.MutationLimitReachedException; + +/** + * Thrown from executeBatch() when the mutation buffer limit is reached. + * The batch is automatically trimmed to contain only unprocessed items. + */ +public class MutationLimitBatchException extends BatchUpdateException { + private static final long serialVersionUID = 1L; + + private final int processedCount; + + /** + * @param updateCounts array of update counts for each statement in the batch + * @param cause the underlying MutationLimitReachedException + * @param processedCount number of statements successfully processed + */ + public MutationLimitBatchException( + int[] updateCounts, + MutationLimitReachedException cause, + int processedCount) { + super(cause.getMessage(), + cause.getSQLState(), + cause.getErrorCode(), + updateCounts, + cause); + this.processedCount = processedCount; + } + + /** + * Returns the number of statements that were successfully processed + * before the limit was reached. The batch has been trimmed to contain + * only the remaining unprocessed items. + */ + public int getProcessedCount() { + return processedCount; + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index b527ecd0420..5a1ca86f607 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -204,6 +204,7 @@ import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataEntityNotFoundException; +import org.apache.phoenix.schema.MutationLimitReachedException; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; @@ -2451,6 +2452,29 @@ public int[] executeBatch() throws SQLException { connection.commit(); } return returnCodes; + } catch (MutationLimitReachedException limitEx) { + // Special handling: limit reached but existing mutations preserved + // Statements 0 through i-1 were successfully added to MutationState + // Statement at index i was NOT added (its join was rejected by the pre-check) + + // If original autoCommit was true, commit what we have buffered + if (autoCommit) { + connection.commit(); + } + + // Trim the batch list to contain only unprocessed items (from index i to end) + if (i > 0) { + List remaining = + new ArrayList<>(batch.subList(i, batch.size())); + batch.clear(); + batch.addAll(remaining); + } + + // Mark the failed index + returnCodes[i] = Statement.EXECUTE_FAILED; + + // Throw MutationLimitBatchException with checkpoint information + throw new MutationLimitBatchException(returnCodes, limitEx, i); } catch (SQLException t) { if (i == returnCodes.length) { // Exception after for loop, perhaps in commit(), discard returnCodes. diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index dc5d042e464..0d120d79842 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -97,6 +97,8 @@ public interface QueryServices extends SQLCloseable { public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = "phoenix.mutate.maxSizeBytes"; + public static final String PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB = + "phoenix.mutate.preserveOnLimitExceeded"; public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE = "hbase.client.keyvalue.maxsize"; public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 2af299aa2c3..31e72c4ce30 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -174,6 +174,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true; public static final int DEFAULT_MAX_MUTATION_SIZE = 500000; public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 Mb + public static final boolean DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED = false; public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE = 10485760; // 10 Mb public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java new file mode 100644 index 00000000000..ac177160c04 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import java.sql.SQLException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; + +/** + * Exception thrown when a mutation operation would exceed the configured + * mutation buffer size limit. Unlike MaxMutationSizeBytesExceededException, + * existing buffered mutations are preserved and can be committed before retrying. + */ +public class MutationLimitReachedException extends SQLException { + private static final long serialVersionUID = 1L; + private static final SQLExceptionCode CODE = + SQLExceptionCode.MUTATION_LIMIT_REACHED; + + public MutationLimitReachedException() { + super(new SQLExceptionInfo.Builder(CODE).build().toString(), + CODE.getSQLState(), + CODE.getErrorCode()); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index c654601a4ac..c6f18be6a90 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.MutationLimitBatchException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.MutationLimitReachedException; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.util.Repeat; @@ -313,6 +315,312 @@ public void testUpsertMaxMutationSize() throws Exception { } } + /** + * Tests that when preserveOnLimitExceeded=true, executeUpdate() throws + * MutationLimitReachedException without clearing buffered mutations, + * allowing the client to commit and retry. + */ + @Test + public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { + String fullTableName = generateUniqueName(); + int maxMutationSize = 5; + + // Create table + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Test with preserveOnLimitExceeded=true + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) VALUES (?,?,?)"); + + int totalRows = 12; + int processed = 0; + int commitCount = 0; + + while (processed < totalRows) { + try { + for (int i = processed; i < totalRows; i++) { + stmt.setString(1, "ORG" + String.format("%011d", i)); + stmt.setString(2, "ENT" + String.format("%011d", i)); + stmt.setInt(3, i); + stmt.executeUpdate(); + processed = i + 1; + } + conn.commit(); + commitCount++; + break; // All done + } catch (MutationLimitReachedException e) { + // Verify the exception is the expected type + assertEquals(SQLExceptionCode.MUTATION_LIMIT_REACHED.getErrorCode(), e.getErrorCode()); + + // Verify mutations were preserved - MutationState should have rows + MutationState state = conn.getMutationState(); + assertTrue("Mutations should be preserved", state.getNumRows() > 0); + + // Commit what we have so far + conn.commit(); + commitCount++; + // Loop continues from 'processed' index + } + } + + // Should have required multiple commits due to limit + assertTrue("Should have committed multiple times", commitCount > 1); + + // Verify all rows were inserted + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { + assertTrue(rs.next()); + assertEquals(totalRows, rs.getInt(1)); + } + } + } + + /** + * Tests that when preserveOnLimitExceeded=true, executeBatch() throws + * MutationLimitBatchException with proper checkpoint info, allowing recovery. + */ + @Test + public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { + String fullTableName = generateUniqueName(); + int maxMutationSize = 5; + + // Create table + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Test with preserveOnLimitExceeded=true and autoCommit=false + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) VALUES (?,?,?)"); + + int totalRows = 15; + int commitCount = 0; + + // Add all rows to batch + for (int i = 0; i < totalRows; i++) { + stmt.setString(1, "ORG" + String.format("%011d", i)); + stmt.setString(2, "ENT" + String.format("%011d", i)); + stmt.setInt(3, i); + stmt.addBatch(); + } + + // Execute batch - should hit limit and throw MutationLimitBatchException + while (true) { + try { + stmt.executeBatch(); + conn.commit(); + commitCount++; + break; // All done + } catch (MutationLimitBatchException e) { + // Verify we got processedCount + assertTrue("ProcessedCount should be > 0", e.getProcessedCount() > 0); + + // Commit what was successfully buffered + conn.commit(); + commitCount++; + // executeBatch() trims the batch, so just retry + } + } + + // Should have required multiple commits + assertTrue("Should have committed multiple times", commitCount > 1); + + // Verify all rows were inserted + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { + assertTrue(rs.next()); + assertEquals(totalRows, rs.getInt(1)); + } + } + } + + /** + * Tests executeBatch() with autoCommit=true and preserveOnLimitExceeded=true. + * In this case, executeBatch() should auto-commit on limit and trim batch. + */ + @Test + public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception { + String fullTableName = generateUniqueName(); + int maxMutationSize = 5; + + // Create table + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Test with preserveOnLimitExceeded=true and autoCommit=true + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); // autoCommit is true + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) VALUES (?,?,?)"); + + int totalRows = 15; + int exceptionCount = 0; + + // Add all rows to batch + for (int i = 0; i < totalRows; i++) { + stmt.setString(1, "ORG" + String.format("%011d", i)); + stmt.setString(2, "ENT" + String.format("%011d", i)); + stmt.setInt(3, i); + stmt.addBatch(); + } + + // Execute batch - with autoCommit=true, it should auto-commit and trim + while (true) { + try { + stmt.executeBatch(); + break; // All done + } catch (MutationLimitBatchException e) { + exceptionCount++; + // With autoCommit=true, mutations were already committed + // Batch is trimmed, just retry + } + } + + // Should have hit limit at least once + assertTrue("Should have hit limit at least once", exceptionCount > 0); + + // Verify all rows were inserted + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { + assertTrue(rs.next()); + assertEquals(totalRows, rs.getInt(1)); + } + } + } + + /** + * Tests that when preserveOnLimitExceeded=false (default), the old behavior + * is maintained - mutations are cleared on limit exceeded. + */ + @Test + public void testExecuteUpdateDefaultBehaviorClearsMutations() throws Exception { + String fullTableName = generateUniqueName(); + int maxMutationSize = 5; + + // Create table + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Test with preserveOnLimitExceeded=false (default) + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); + // Not setting PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, should default to false + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) VALUES (?,?,?)"); + + try { + for (int i = 0; i < 20; i++) { + stmt.setString(1, "ORG" + String.format("%011d", i)); + stmt.setString(2, "ENT" + String.format("%011d", i)); + stmt.setInt(3, i); + stmt.executeUpdate(); + } + fail("Should have thrown MaxMutationSizeExceededException"); + } catch (SQLException e) { + // Should be the old exception type, not MutationLimitReachedException + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode()); + + // Verify mutations were cleared (old behavior) + MutationState state = conn.getMutationState(); + assertEquals("Mutations should be cleared", 0, state.getNumRows()); + } + } + } + + /** + * Tests byte size limit with preserveOnLimitExceeded=true. + */ + @Test + public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { + String fullTableName = generateUniqueName(); + + // Create table with a VARCHAR column for large values + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Test with preserveOnLimitExceeded=true and low byte limit + // Each row is approximately 900+ bytes, so set limit to allow ~3-4 rows + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000000"); // High row limit + props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "3000"); // Low byte limit (~3-4 rows) + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); + + int totalRows = 10; + int processed = 0; + int commitCount = 0; + String largeValue = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; // ~36 bytes per row in tags + + while (processed < totalRows) { + try { + for (int i = processed; i < totalRows; i++) { + stmt.setString(1, "ORG" + String.format("%011d", i)); + stmt.setString(2, "ENT" + String.format("%011d", i)); + stmt.setInt(3, i); + stmt.setString(4, largeValue + i); + stmt.executeUpdate(); + processed = i + 1; + } + conn.commit(); + commitCount++; + break; + } catch (MutationLimitReachedException e) { + assertEquals(SQLExceptionCode.MUTATION_LIMIT_REACHED.getErrorCode(), e.getErrorCode()); + conn.commit(); + commitCount++; + } + } + + // Should have required multiple commits due to byte limit + assertTrue("Should have committed multiple times due to byte limit", commitCount > 1); + + // Verify all rows were inserted + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { + assertTrue(rs.next()); + assertEquals(totalRows, rs.getInt(1)); + } + } + } + @Test public void testMutationEstimatedSize() throws Exception { PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); From a53324c451b52aee5197b2f9db302438ceea9f1f Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Thu, 5 Feb 2026 19:01:10 +0530 Subject: [PATCH 2/8] Self review cleanup and refactoring --- .../apache/phoenix/execute/MutationState.java | 107 +++++++++--------- .../phoenix/util/PhoenixKeyValueUtil.java | 29 +---- .../phoenix/end2end/MutationStateIT.java | 65 +++++++++++ 3 files changed, 118 insertions(+), 83 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index 69653170c7f..eea45555ac5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -280,14 +280,20 @@ Maps.> newHashMapWithExpectedSize(5), subT public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, int maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); + + int incomingRows = mutations.size(); + long incomingBytes = PhoenixKeyValueUtil.calculateMultiRowMutationSize(mutations); + + // Check limit before updating state - at this point this.numRows=0 and this.estimatedSize=0, + // so checkLimit knows there's no existing state to preserve + checkLimit(incomingRows, incomingBytes); + if (!mutations.isEmpty()) { addMutations(this.mutationsMap, table, mutations); } - this.numRows = mutations.size(); - this.estimatedSize = - PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap); - throwIfTooBig(); + this.numRows = incomingRows; + this.estimatedSize = incomingBytes; } // add a new batch of row mutations @@ -305,15 +311,21 @@ private void addMutations(Map> mutationMap private void removeMutations(Map> mutationMap, TableRef table) { List batches = mutationMap.get(table); - if (batches == null || batches.isEmpty()) { - mutationMap.remove(table); - return; - } + if (batches != null && !batches.isEmpty()) { + // mutation batches are committed in FIFO order so always remove from the head + MultiRowMutationState removed = batches.remove(0); + + // Update counts only for data tables + if (table.getTable().getType() == PTableType.TABLE) { + numRows -= removed.size(); + // Use calculateMultiRowMutationSize to include row key sizes, consistent with + // how estimatedSize is calculated via getEstimatedRowMutationSizeWithBatch + estimatedSize -= PhoenixKeyValueUtil.calculateMultiRowMutationSize(removed); + } - // mutation batches are committed in FIFO order so always remove from the head - batches.remove(0); - if (batches.isEmpty()) { - mutationMap.remove(table); + if (batches.isEmpty()) { + mutationMap.remove(table); + } } } @@ -501,22 +513,36 @@ public static MutationState emptyMutationState(int maxSize, long maxSizeBytes, return state; } - private void throwIfTooBig() throws SQLException { - if (numRows > maxSize || estimatedSize > maxSizeBytes) { - if (preserveOnLimitExceeded) { + /** + * Checks if adding the incoming mutations would exceed the configured limits. + * + * @param incomingRows number of rows being added + * @param incomingBytes estimated size in bytes being added + * @throws MutationLimitReachedException if preserveOnLimitExceeded is true and there is + * existing state to preserve (this.numRows > 0 or this.estimatedSize > 0) + * @throws MaxMutationSizeExceededException if row limit exceeded and no state to preserve + * @throws MaxMutationSizeBytesExceededException if byte limit exceeded and no state to preserve + */ + private void checkLimit(int incomingRows, long incomingBytes) throws SQLException { + long combinedRows = this.numRows + incomingRows; + long combinedBytes = this.estimatedSize + incomingBytes; + + if (combinedRows > maxSize || combinedBytes > maxSizeBytes) { + // Only use preserve mode when there's existing state to preserve. + // Check if this MutationState already has mutations buffered. + boolean hasExistingState = this.numRows > 0 || this.estimatedSize > 0; + if (preserveOnLimitExceeded && hasExistingState) { // Preserve mode: throw new exception WITHOUT resetting state throw new MutationLimitReachedException(); } else { // Legacy mode: reset state and throw old exceptions - if (numRows > maxSize) { - int mutationSize = numRows; + if (combinedRows > maxSize) { resetState(); - throw new MaxMutationSizeExceededException(maxSize, mutationSize); + throw new MaxMutationSizeExceededException(maxSize, (int) combinedRows); } - if (estimatedSize > maxSizeBytes) { - long mutationSizeByte = estimatedSize; + if (combinedBytes > maxSizeBytes) { resetState(); - throw new MaxMutationSizeBytesExceededException(maxSizeBytes, mutationSizeByte); + throw new MaxMutationSizeBytesExceededException(maxSizeBytes, combinedBytes); } } } @@ -630,22 +656,6 @@ private void joinMutationState(Map> srcMut } } - /** - * Pre-checks if joining the given MutationState would exceed configured limits. - * Throws MutationLimitReachedException if limits would be exceeded. - * Does NOT modify any state - this is a read-only check. - * - * @param newMutationState the state that would be joined - * @throws MutationLimitReachedException if joining would exceed limits - */ - private void checkLimitBeforeJoin(MutationState newMutationState) - throws MutationLimitReachedException { - if (this.numRows + newMutationState.numRows > maxSize || - this.estimatedSize + newMutationState.estimatedSize > maxSizeBytes) { - throw new MutationLimitReachedException(); - } - } - /** * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take * precedence. Combine any metrics collected for the newer mutation. the newer mutation state @@ -655,11 +665,10 @@ public void join(MutationState newMutationState) throws SQLException { return; } - // Pre-check: if preserveOnLimitExceeded is enabled, check before joining - // to avoid merging mutations that would exceed the limit - if (preserveOnLimitExceeded) { - checkLimitBeforeJoin(newMutationState); - } + // Check limit before joining to ensure combined state won't exceed limits. + // For preserveOnLimitExceeded=true: throws MutationLimitReachedException, state preserved + // For preserveOnLimitExceeded=false (legacy): throws old exceptions, state is reset + checkLimit(newMutationState.getNumRows(), newMutationState.getEstimatedSize()); phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); @@ -677,12 +686,6 @@ public void join(MutationState newMutationState) throws SQLException { } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) { readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue); } - - // Post-check: only run in legacy mode (when preserveOnLimitExceeded is false) - // In preserve mode, we trust the pre-check and skip this - if (!preserveOnLimitExceeded) { - throwIfTooBig(); - } } private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, @@ -1655,14 +1658,8 @@ private IOException updateTableRegionCacheIfNecessary(IOException ioe) { shouldRetry = false; numFailedMutations = 0; - // Remove batches as we process them + // Remove batches as we process them (also updates numRows and estimatedSize for data tables) removeMutations(this.mutationsMap, origTableRef); - if (tableInfo.isDataTable()) { - numRows -= numMutations; - // recalculate the estimated size - estimatedSize = - PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap); - } areAllBatchesSuccessful = true; } catch (Exception e) { long serverTimestamp = ClientUtil.parseServerTimestamp(e); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index 4f4bbffa5e8..8177f54b2b7 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -173,33 +173,6 @@ public static long calculateMutationDiskSize(Mutation m) { return size; } - /** - * Estimates the storage size of a row - * @param tableMutationMap map from table to row to RowMutationState - * @return estimated row size - */ - public static long - getEstimatedRowMutationSize(Map tableMutationMap) { - long size = 0; - // iterate over table - for (Entry tableEntry : tableMutationMap.entrySet()) { - size += calculateMultiRowMutationSize(tableEntry.getValue()); - } - return size; - } - - public static long getEstimatedRowMutationSizeWithBatch( - Map> tableMutationMap) { - long size = 0; - // iterate over table - for (Entry> tableEntry : tableMutationMap.entrySet()) { - for (MultiRowMutationState batch : tableEntry.getValue()) { - size += calculateMultiRowMutationSize(batch); - } - } - return size; - } - /** * If c is not a KeyValue, cast it to KeyValue and return it. If c is a KeyValue, just return it * @param c cell @@ -267,7 +240,7 @@ public static List maybeCopyCellList(List cells) { return cells; } - private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { + public static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { long size = 0; // iterate over rows for (Entry rowEntry : mutations.entrySet()) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index c6f18be6a90..8b1c37d90c5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -48,6 +48,7 @@ import org.apache.phoenix.jdbc.MutationLimitBatchException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.MaxMutationSizeExceededException; import org.apache.phoenix.schema.MutationLimitReachedException; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableType; @@ -621,6 +622,70 @@ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { } } + /** + * Tests that when preserveOnLimitExceeded is true but a single mutation exceeds + * the limit (constructor case), we still get the old exception type since there's + * no existing state to preserve. + */ + @Test + public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception { + String fullTableName = generateUniqueName(); + String sourceTable = generateUniqueName(); + + // Use a connection without limits to create tables and set up test data + try (Connection setupConn = DriverManager.getConnection(getUrl())) { + setupConn.setAutoCommit(true); + try (Statement stmt = setupConn.createStatement()) { + stmt.execute("CREATE TABLE " + fullTableName + DDL); + stmt.execute("CREATE TABLE " + sourceTable + " (id VARCHAR PRIMARY KEY, val INTEGER)"); + // Insert multiple rows into source table + stmt.executeUpdate("UPSERT INTO " + sourceTable + " VALUES ('a', 1)"); + stmt.executeUpdate("UPSERT INTO " + sourceTable + " VALUES ('b', 2)"); + } + } + + // Now use a connection with the limit set + Properties props = new Properties(); + // Set limit to 1 row so any operation producing multiple rows would exceed it + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1"); + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + // Insert a row to have some existing state in the connection's MutationState + try (PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score) VALUES (?, ?, ?)")) { + stmt.setString(1, "org1"); + stmt.setString(2, "entity1"); + stmt.setInt(3, 1); + stmt.executeUpdate(); + } + + // Now do a SELECT-based UPSERT that produces multiple rows in a single mutation. + // This creates a new MutationState in the constructor that already exceeds the limit. + // Since there's no state to preserve in that NEW MutationState (only 1 row was inserted + // into the connection's existing state), we should get the old exception type + // (MaxMutationSizeExceededException), not MutationLimitReachedException. + try (Statement stmt = conn.createStatement()) { + try { + stmt.executeUpdate( + "UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score) " + + "SELECT id, 'e', val FROM " + sourceTable); + fail("Expected MaxMutationSizeExceededException"); + } catch (MaxMutationSizeExceededException e) { + // Expected - legacy exception even though preserveOnLimitExceeded is true, + // because the single mutation (from constructor) has no prior state to preserve + } catch (MutationLimitReachedException e) { + fail("Should have thrown MaxMutationSizeExceededException, not MutationLimitReachedException. " + + "When a single mutation exceeds the limit, there's no state to preserve."); + } + } + } + } + @Test public void testMutationEstimatedSize() throws Exception { PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); From 735ef35da8941a5c8a2939111bdce10bfe11a002 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Fri, 6 Feb 2026 14:00:15 +0530 Subject: [PATCH 3/8] Self review cleanup and refactoring 2 --- .../apache/phoenix/execute/MutationState.java | 185 +++++++++--------- .../phoenix/util/PhoenixKeyValueUtil.java | 14 +- 2 files changed, 109 insertions(+), 90 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index eea45555ac5..e6dad10e164 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -280,20 +280,14 @@ Maps.> newHashMapWithExpectedSize(5), subT public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, int maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); - - int incomingRows = mutations.size(); - long incomingBytes = PhoenixKeyValueUtil.calculateMultiRowMutationSize(mutations); - - // Check limit before updating state - at this point this.numRows=0 and this.estimatedSize=0, - // so checkLimit knows there's no existing state to preserve - checkLimit(incomingRows, incomingBytes); - if (!mutations.isEmpty()) { addMutations(this.mutationsMap, table, mutations); } + this.numRows = mutations.size(); + this.estimatedSize = + PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap); - this.numRows = incomingRows; - this.estimatedSize = incomingBytes; + throwIfTooBig(); } // add a new batch of row mutations @@ -311,21 +305,15 @@ private void addMutations(Map> mutationMap private void removeMutations(Map> mutationMap, TableRef table) { List batches = mutationMap.get(table); - if (batches != null && !batches.isEmpty()) { - // mutation batches are committed in FIFO order so always remove from the head - MultiRowMutationState removed = batches.remove(0); - - // Update counts only for data tables - if (table.getTable().getType() == PTableType.TABLE) { - numRows -= removed.size(); - // Use calculateMultiRowMutationSize to include row key sizes, consistent with - // how estimatedSize is calculated via getEstimatedRowMutationSizeWithBatch - estimatedSize -= PhoenixKeyValueUtil.calculateMultiRowMutationSize(removed); - } + if (batches == null || batches.isEmpty()) { + mutationMap.remove(table); + return; + } - if (batches.isEmpty()) { - mutationMap.remove(table); - } + // mutation batches are committed in FIFO order so always remove from the head + batches.remove(0); + if (batches.isEmpty()) { + mutationMap.remove(table); } } @@ -513,38 +501,28 @@ public static MutationState emptyMutationState(int maxSize, long maxSizeBytes, return state; } + private void throwIfTooBig() throws SQLException { + if (numRows > maxSize) { + int mutationSize = numRows; + resetState(); + throw new MaxMutationSizeExceededException(maxSize, mutationSize); + } + if (estimatedSize > maxSizeBytes) { + long mutationSizeByte = estimatedSize; + resetState(); + throw new MaxMutationSizeBytesExceededException(maxSizeBytes, mutationSizeByte); + } + } + /** - * Checks if adding the incoming mutations would exceed the configured limits. - * - * @param incomingRows number of rows being added - * @param incomingBytes estimated size in bytes being added - * @throws MutationLimitReachedException if preserveOnLimitExceeded is true and there is - * existing state to preserve (this.numRows > 0 or this.estimatedSize > 0) - * @throws MaxMutationSizeExceededException if row limit exceeded and no state to preserve - * @throws MaxMutationSizeBytesExceededException if byte limit exceeded and no state to preserve + * Pre-check for preserve mode: throws MutationLimitReachedException if adding the given + * rows/bytes would exceed limits. Does NOT modify state - caller should only proceed with + * modification if this method returns normally. */ - private void checkLimit(int incomingRows, long incomingBytes) throws SQLException { - long combinedRows = this.numRows + incomingRows; - long combinedBytes = this.estimatedSize + incomingBytes; - - if (combinedRows > maxSize || combinedBytes > maxSizeBytes) { - // Only use preserve mode when there's existing state to preserve. - // Check if this MutationState already has mutations buffered. - boolean hasExistingState = this.numRows > 0 || this.estimatedSize > 0; - if (preserveOnLimitExceeded && hasExistingState) { - // Preserve mode: throw new exception WITHOUT resetting state - throw new MutationLimitReachedException(); - } else { - // Legacy mode: reset state and throw old exceptions - if (combinedRows > maxSize) { - resetState(); - throw new MaxMutationSizeExceededException(maxSize, (int) combinedRows); - } - if (combinedBytes > maxSizeBytes) { - resetState(); - throw new MaxMutationSizeBytesExceededException(maxSizeBytes, combinedBytes); - } - } + private void throwIfLimitWouldBeExceeded(int additionalRows, long additionalBytes) + throws SQLException { + if (numRows + additionalRows > maxSize || estimatedSize + additionalBytes > maxSizeBytes) { + throw new MutationLimitReachedException(); } } @@ -578,7 +556,7 @@ public void clearResult() { } private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, - Map> dstMutations) { + Map> dstMutations) throws SQLException { PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; boolean incrementRowCount = dstMutations == this.mutationsMap; @@ -587,6 +565,10 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, MultiRowMutationState existingRows = getLastMutationBatch(dstMutations, tableRef); if (existingRows == null) { // no rows found for this table + // For preserve mode, check limits BEFORE modifying any state + if (incrementRowCount && !isIndex && preserveOnLimitExceeded) { + throwIfLimitWouldBeExceeded(srcRows.size(), srcRows.estimatedSize); + } // Size new map at batch size as that's what it'll likely grow to. MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize()); newRows.putAll(srcRows); @@ -610,11 +592,16 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, RowMutationState newRowMutationState = rowEntry.getValue(); RowMutationState existingRowMutationState = existingRows.get(key); if (existingRowMutationState == null) { + // For preserve mode, check limits BEFORE modifying any state + long newRowSize = newRowMutationState.calculateEstimatedSize(); + if (incrementRowCount && !isIndex && preserveOnLimitExceeded) { + throwIfLimitWouldBeExceeded(1, newRowSize); + } existingRows.put(key, newRowMutationState); if (incrementRowCount && !isIndex) { // Don't count index rows in row count numRows++; // increment estimated size by the size of the new row - estimatedSize += newRowMutationState.calculateEstimatedSize(); + estimatedSize += newRowSize; } continue; } @@ -622,13 +609,12 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, Map newValues = newRowMutationState.getColumnValues(); if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) { // Check if we can merge existing column values with new column values - long beforeMergeSize = existingRowMutationState.calculateEstimatedSize(); - boolean isMerged = existingRowMutationState.join(rowEntry.getValue()); - if (isMerged) { - // decrement estimated size by the size of the old row - estimatedSize -= beforeMergeSize; - // increment estimated size by the size of the new row - estimatedSize += existingRowMutationState.calculateEstimatedSize(); + // For preserve mode, pass this so join() can check limits before modification + Long sizeDiff = existingRowMutationState.join(newRowMutationState, + preserveOnLimitExceeded ? this : null); + if (sizeDiff != null) { + // Merged successfully (row count unchanged - same row key) + estimatedSize += sizeDiff; } else { // cannot merge regular upsert and conditional upsert // conflicting row is not a new row so no need to increment numRows @@ -645,7 +631,7 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, } private void joinMutationState(Map> srcMutations, - Map> dstMutations) { + Map> dstMutations) throws SQLException { // Merge newMutation with this one, keeping state from newMutation for any overlaps for (Map.Entry> entry : srcMutations.entrySet()) { TableRef tableRef = entry.getKey(); @@ -665,11 +651,6 @@ public void join(MutationState newMutationState) throws SQLException { return; } - // Check limit before joining to ensure combined state won't exceed limits. - // For preserveOnLimitExceeded=true: throws MutationLimitReachedException, state preserved - // For preserveOnLimitExceeded=false (legacy): throws old exceptions, state is reset - checkLimit(newMutationState.getNumRows(), newMutationState.getEstimatedSize()); - phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; @@ -686,6 +667,7 @@ public void join(MutationState newMutationState) throws SQLException { } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) { readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue); } + throwIfTooBig(); } private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, @@ -1654,12 +1636,17 @@ private IOException updateTableRegionCacheIfNecessary(IOException ioe) { "Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName)); } child.stop(); - child.stop(); shouldRetry = false; numFailedMutations = 0; - // Remove batches as we process them (also updates numRows and estimatedSize for data tables) + // Remove batches as we process them removeMutations(this.mutationsMap, origTableRef); + if (tableInfo.isDataTable()) { + numRows -= numMutations; + // recalculate the estimated size + estimatedSize = + PhoenixKeyValueUtil.getEstimatedRowMutationSizeWithBatch(this.mutationsMap); + } areAllBatchesSuccessful = true; } catch (Exception e) { long serverTimestamp = ClientUtil.parseServerTimestamp(e); @@ -2445,36 +2432,56 @@ int[] getStatementIndexes() { /** * Join the newRow with the current row if it doesn't conflict with it. A regular upsert - * conflicts with a conditional upsert - * @return True if the rows were successfully joined else False + * conflicts with a conditional upsert. + * @param mutationState if non-null, checks limits before modification and throws + * MutationLimitReachedException if size increase would exceed limits + * @return the size change (can be 0, positive, or negative) if merged, or null if conflicting */ - boolean join(RowMutationState newRow) { + Long join(RowMutationState newRow, MutationState mutationState) throws SQLException { if (isConflicting(newRow)) { - return false; + return null; } - // If we already have a row and the new row has an ON DUPLICATE KEY clause - // ignore the new values (as that's what the server will do). + + // Pre-compute merged results (no side effects - these return new objects) + byte[] combinedOnDupKey = + PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes); + int[] mergedIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); + + // Calculate column values size change + long colValuesSizeDiff = 0; if (newRow.onDupKeyBytes == null) { - // increment the column value size by the new row column value size - colValuesSize += newRow.colValuesSize; + colValuesSizeDiff = newRow.colValuesSize; for (Map.Entry entry : newRow.columnValues.entrySet()) { - PColumn col = entry.getKey(); - byte[] oldValue = columnValues.put(col, entry.getValue()); + byte[] oldValue = columnValues.get(entry.getKey()); if (oldValue != null) { - // decrement column value size by the size of all column values that were replaced - colValuesSize -= (col.getEstimatedSize() + oldValue.length); + colValuesSizeDiff -= (entry.getKey().getEstimatedSize() + oldValue.length); } } } - // Concatenate ON DUPLICATE KEY bytes to allow multiple - // increments of the same row in the same commit batch. - this.onDupKeyBytes = - PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes); + + // Total size change (can be negative) + long totalSizeDiff = colValuesSizeDiff + + ((combinedOnDupKey != null ? combinedOnDupKey.length : 0) + - (this.onDupKeyBytes != null ? this.onDupKeyBytes.length : 0)) + + (mergedIndexes.length - statementIndexes.length) * SizedUtil.INT_SIZE; + + // Check limit BEFORE any modification (row count unchanged for merge - same row key) + if (mutationState != null) { + mutationState.throwIfLimitWouldBeExceeded(0, totalSizeDiff); + } + + // Apply modifications + this.onDupKeyBytes = combinedOnDupKey; + this.statementIndexes = mergedIndexes; if (newRow.onDupKeyType == OnDuplicateKeyType.UPDATE_ONLY) { this.onDupKeyType = OnDuplicateKeyType.UPDATE_ONLY; } - statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); - return true; + if (newRow.onDupKeyBytes == null) { + columnValues.putAll(newRow.columnValues); + colValuesSize += colValuesSizeDiff; + } + + return totalSizeDiff; } @Nonnull diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index 8177f54b2b7..83acb1dfa71 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -240,7 +240,19 @@ public static List maybeCopyCellList(List cells) { return cells; } - public static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { + public static long getEstimatedRowMutationSizeWithBatch( + Map> tableMutationMap) { + long size = 0; + // iterate over table + for (Entry> tableEntry : tableMutationMap.entrySet()) { + for (MultiRowMutationState batch : tableEntry.getValue()) { + size += calculateMultiRowMutationSize(batch); + } + } + return size; + } + + private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { long size = 0; // iterate over rows for (Entry rowEntry : mutations.entrySet()) { From df5da229578f8fc8b5da41235b649a460f6b3d67 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Fri, 6 Feb 2026 18:27:02 +0530 Subject: [PATCH 4/8] Additional test coverage --- .../phoenix/end2end/MutationStateIT.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 8b1c37d90c5..7f9c23c9fba 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -686,6 +686,82 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception } } + /** + * Tests that when preserveOnLimitExceeded=true and the limit is reached during + * row merge (same row key updated multiple times), the mutations are preserved. + * This specifically tests the 3rd case in joinMutationState where rows are merged. + */ + @Test + public void testRowMergePreserveOnLimitExceeded() throws Exception { + String fullTableName = generateUniqueName(); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Set byte limit low enough that merging rows with increasing sizes will trigger it + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1500"); + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); + + int commitCount = 0; + int totalUpdates = 0; + String baseKey = "ORG000000000001"; + String entityKey = "ENT000000000001"; + + // Keep updating the SAME row with larger values until limit reached + // Each update merges with the existing row, increasing the size + while (totalUpdates < 50) { + try { + for (int i = totalUpdates; i < 50; i++) { + stmt.setString(1, baseKey); + stmt.setString(2, entityKey); + stmt.setInt(3, i); + // Increasing size with each update - this will eventually exceed byte limit + StringBuilder sb = new StringBuilder("update" + i + "_"); + for (int j = 0; j < i * 10; j++) { + sb.append("X"); + } + stmt.setString(4, sb.toString()); + stmt.executeUpdate(); + totalUpdates = i + 1; + } + conn.commit(); + commitCount++; + break; + } catch (MutationLimitReachedException e) { + // Mutations should be preserved - verify we have state + MutationState state = conn.getMutationState(); + assertTrue("Mutations should be preserved on limit exceeded", + state.getNumRows() > 0 || state.getEstimatedSize() > 0); + conn.commit(); + commitCount++; + } + } + + // Should have hit limit and committed multiple times due to row merge size growth + assertTrue("Should have committed at least once", commitCount >= 1); + + // Verify the row exists with the latest value + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT score FROM " + fullTableName + + " WHERE organization_id = '" + baseKey + "' AND entity_id = '" + entityKey + "'")) { + assertTrue("Row should exist", rs.next()); + // Score should be the last successfully committed value + assertTrue("Score should be set", rs.getInt(1) >= 0); + } + } + } + @Test public void testMutationEstimatedSize() throws Exception { PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); From 4c9f35fa42247005981197379801e272f9491afe Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Mon, 9 Feb 2026 11:28:32 +0530 Subject: [PATCH 5/8] Self review, fix gaps --- .../apache/phoenix/execute/MutationState.java | 27 ++-- .../phoenix/end2end/MutationStateIT.java | 119 ++++++++++++++++++ 2 files changed, 139 insertions(+), 7 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index e6dad10e164..080a98a1773 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -516,8 +516,7 @@ private void throwIfTooBig() throws SQLException { /** * Pre-check for preserve mode: throws MutationLimitReachedException if adding the given - * rows/bytes would exceed limits. Does NOT modify state - caller should only proceed with - * modification if this method returns normally. + * rows/bytes would exceed limits, which prevents the caller from modifying the state. */ private void throwIfLimitWouldBeExceeded(int additionalRows, long additionalBytes) throws SQLException { @@ -560,20 +559,21 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; boolean incrementRowCount = dstMutations == this.mutationsMap; + boolean impactsLimits = incrementRowCount && !isIndex; // we only need to check if the new mutation batch (srcRows) conflicts with the // last mutation batch since we try to merge it with that only MultiRowMutationState existingRows = getLastMutationBatch(dstMutations, tableRef); if (existingRows == null) { // no rows found for this table // For preserve mode, check limits BEFORE modifying any state - if (incrementRowCount && !isIndex && preserveOnLimitExceeded) { + if (impactsLimits && preserveOnLimitExceeded) { throwIfLimitWouldBeExceeded(srcRows.size(), srcRows.estimatedSize); } // Size new map at batch size as that's what it'll likely grow to. MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize()); newRows.putAll(srcRows); addMutations(dstMutations, tableRef, newRows); - if (incrementRowCount && !isIndex) { + if (impactsLimits) { numRows += srcRows.size(); // if we added all the rows from newMutationState we can just increment the // estimatedSize by newMutationState.estimatedSize @@ -585,6 +585,7 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, // for conflicting rows MultiRowMutationState conflictingRows = new MultiRowMutationState(connection.getMutateBatchSize()); + long conflictingRowsTotalSize = 0; // Rows for this table already exist, check for conflicts for (Map.Entry rowEntry : srcRows.entrySet()) { @@ -594,17 +595,18 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, if (existingRowMutationState == null) { // For preserve mode, check limits BEFORE modifying any state long newRowSize = newRowMutationState.calculateEstimatedSize(); - if (incrementRowCount && !isIndex && preserveOnLimitExceeded) { + if (impactsLimits && preserveOnLimitExceeded) { throwIfLimitWouldBeExceeded(1, newRowSize); } existingRows.put(key, newRowMutationState); - if (incrementRowCount && !isIndex) { // Don't count index rows in row count + if (impactsLimits) { // Don't count index rows in row count numRows++; // increment estimated size by the size of the new row estimatedSize += newRowSize; } continue; } + Map existingValues = existingRowMutationState.getColumnValues(); Map newValues = newRowMutationState.getColumnValues(); if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) { @@ -617,8 +619,17 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, estimatedSize += sizeDiff; } else { // cannot merge regular upsert and conditional upsert - // conflicting row is not a new row so no need to increment numRows + // conflicting row goes into a separate batch (same key, different semantics) + // Row count unchanged (same key), but size increases + long conflictingRowSize = newRowMutationState.calculateEstimatedSize(); + if (impactsLimits && preserveOnLimitExceeded) { + // Include already-accumulated conflicting rows size in the check + throwIfLimitWouldBeExceeded(0, conflictingRowsTotalSize + conflictingRowSize); + } conflictingRows.put(key, newRowMutationState); + if (impactsLimits) { + conflictingRowsTotalSize += conflictingRowSize; + } } } else { existingRows.put(key, newRowMutationState); @@ -627,6 +638,8 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, if (!conflictingRows.isEmpty()) { addMutations(dstMutations, tableRef, conflictingRows); + // Update estimatedSize only after actual state change + estimatedSize += conflictingRowsTotalSize; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 7f9c23c9fba..38443167b51 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -762,6 +762,96 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { } } + /** + * Tests that when preserveOnLimitExceeded=true and the limit is reached during + * conflicting row addition (regular upsert vs ON DUPLICATE KEY upsert on same key), + * the mutations are preserved. This tests the conflict case in joinMutationState. + */ + @Test + public void testConflictingRowsPreserveOnLimitExceeded() throws Exception { + String fullTableName = generateUniqueName(); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); + } + + // Set byte limit low enough that adding conflicting rows will trigger it + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "2000"); + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + + // Regular UPSERT statement + PreparedStatement regularStmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); + + // ON DUPLICATE KEY UPSERT statement - conflicts with regular upsert + PreparedStatement onDupKeyStmt = conn.prepareStatement( + "UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score) VALUES (?,?,?)" + + " ON DUPLICATE KEY UPDATE score = score + 1"); + + int commitCount = 0; + int totalRows = 0; + String orgKey = "ORG000000000001"; + + // Alternate between regular upserts and ON DUPLICATE KEY upserts on same keys + // This creates conflicting rows that can't be merged + while (totalRows < 30) { + try { + for (int i = totalRows; i < 30; i++) { + String entityKey = String.format("ENT%012d", i); + + // First do a regular upsert + regularStmt.setString(1, orgKey); + regularStmt.setString(2, entityKey); + regularStmt.setInt(3, i); + StringBuilder sb = new StringBuilder("data" + i + "_"); + for (int j = 0; j < 20; j++) { + sb.append("X"); + } + regularStmt.setString(4, sb.toString()); + regularStmt.executeUpdate(); + + // Then do an ON DUPLICATE KEY upsert on the same key - this conflicts + onDupKeyStmt.setString(1, orgKey); + onDupKeyStmt.setString(2, entityKey); + onDupKeyStmt.setInt(3, i * 10); + onDupKeyStmt.executeUpdate(); + + totalRows = i + 1; + } + conn.commit(); + commitCount++; + break; + } catch (MutationLimitReachedException e) { + // Mutations should be preserved - verify we have state + MutationState state = conn.getMutationState(); + assertTrue("Mutations should be preserved on limit exceeded", + state.getNumRows() > 0 || state.getEstimatedSize() > 0); + conn.commit(); + commitCount++; + } + } + + // Should have hit limit and committed multiple times due to conflicting row size + assertTrue("Should have committed at least once", commitCount >= 1); + + // Verify rows exist + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + fullTableName + + " WHERE organization_id = '" + orgKey + "'")) { + assertTrue("Should have results", rs.next()); + assertTrue("Should have inserted rows", rs.getInt(1) > 0); + } + } + } + @Test public void testMutationEstimatedSize() throws Exception { PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); @@ -829,6 +919,35 @@ public void testMutationEstimatedSize() throws Exception { stmt.execute(); assertTrue("Mutation state size should decrease", prevEstimatedSize + 4 > state.getEstimatedSize()); + + // Test that estimatedSize increases when adding conflicting rows + // (regular upsert + ON DUPLICATE KEY upsert on same key cannot be merged) + conn.commit(); // clear state + assertEquals("Mutation state size should be zero after commit", 0, state.getEstimatedSize()); + + // Regular upsert + stmt = conn.prepareStatement( + "upsert into " + fullTableName + " (organization_id, entity_id, score) values (?,?,?)"); + stmt.setString(1, "AAAA"); + stmt.setString(2, "BBBB"); + stmt.setInt(3, 1); + stmt.execute(); + long sizeAfterRegularUpsert = state.getEstimatedSize(); + assertTrue("Mutation state size should be > 0 after regular upsert", + sizeAfterRegularUpsert > 0); + + // ON DUPLICATE KEY upsert on same key - creates a conflicting row that can't be merged + PreparedStatement onDupStmt = conn.prepareStatement( + "upsert into " + fullTableName + " (organization_id, entity_id, score) values (?,?,?)" + + " ON DUPLICATE KEY UPDATE score = score + 1"); + onDupStmt.setString(1, "AAAA"); + onDupStmt.setString(2, "BBBB"); + onDupStmt.setInt(3, 2); + onDupStmt.execute(); + + // Size should increase because conflicting row goes into a separate batch + assertTrue("Estimated size should increase for conflicting row", + state.getEstimatedSize() > sizeAfterRegularUpsert); } @Test From e733ad955ad2bfa9dfd9d8f844d698401c9add67 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Mon, 9 Feb 2026 20:42:43 +0530 Subject: [PATCH 6/8] Self review, minor improvements and narrow down diff --- .../apache/phoenix/execute/MutationState.java | 4 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 5 +- .../phoenix/util/PhoenixKeyValueUtil.java | 24 ++-- .../phoenix/end2end/MutationStateIT.java | 116 +++++++----------- 4 files changed, 61 insertions(+), 88 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index 080a98a1773..d36eccce7f4 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -610,8 +610,8 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, Map existingValues = existingRowMutationState.getColumnValues(); Map newValues = newRowMutationState.getColumnValues(); if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) { - // Check if we can merge existing column values with new column values - // For preserve mode, pass this so join() can check limits before modification + // Check if we can merge existing column values with new column values. + // For preserve mode, pass this instance so join() can check limits before modification. Long sizeDiff = existingRowMutationState.join(newRowMutationState, preserveOnLimitExceeded ? this : null); if (sizeDiff != null) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 5a1ca86f607..34db75c55b5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -2464,10 +2464,7 @@ public int[] executeBatch() throws SQLException { // Trim the batch list to contain only unprocessed items (from index i to end) if (i > 0) { - List remaining = - new ArrayList<>(batch.subList(i, batch.size())); - batch.clear(); - batch.addAll(remaining); + batch.subList(0, i).clear(); } // Mark the failed index diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index 83acb1dfa71..a2070712e06 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -173,6 +173,18 @@ public static long calculateMutationDiskSize(Mutation m) { return size; } + public static long getEstimatedRowMutationSizeWithBatch( + Map> tableMutationMap) { + long size = 0; + // iterate over table + for (Entry> tableEntry : tableMutationMap.entrySet()) { + for (MultiRowMutationState batch : tableEntry.getValue()) { + size += calculateMultiRowMutationSize(batch); + } + } + return size; + } + /** * If c is not a KeyValue, cast it to KeyValue and return it. If c is a KeyValue, just return it * @param c cell @@ -240,18 +252,6 @@ public static List maybeCopyCellList(List cells) { return cells; } - public static long getEstimatedRowMutationSizeWithBatch( - Map> tableMutationMap) { - long size = 0; - // iterate over table - for (Entry> tableEntry : tableMutationMap.entrySet()) { - for (MultiRowMutationState batch : tableEntry.getValue()) { - size += calculateMultiRowMutationSize(batch); - } - } - return size; - } - private static long calculateMultiRowMutationSize(MultiRowMutationState mutations) { long size = 0; // iterate over rows diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 38443167b51..776b9f2271b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -84,6 +84,33 @@ private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQL } } + /** + * Helper to create Properties for preserve mode tests. + * @param maxRows max mutation row count limit + * @param maxBytes max mutation byte size limit (0 to skip setting) + */ + private Properties createPreserveModeProps(int maxRows, long maxBytes) { + Properties props = new Properties(); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxRows)); + if (maxBytes > 0) { + props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, String.valueOf(maxBytes)); + } + props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); + return props; + } + + /** + * Helper to verify the row count in a table. + */ + private void verifyRowCount(Connection conn, String tableName, int expected) + throws SQLException { + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + tableName)) { + assertTrue("Should have results", rs.next()); + assertEquals("Row count mismatch", expected, rs.getInt(1)); + } + } + public static String randString(int length) { return new BigInteger(164, RAND).toString().substring(0, length); } @@ -331,13 +358,8 @@ public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - // Test with preserveOnLimitExceeded=true - Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(maxMutationSize, 0))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -377,12 +399,7 @@ public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { // Should have required multiple commits due to limit assertTrue("Should have committed multiple times", commitCount > 1); - // Verify all rows were inserted - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { - assertTrue(rs.next()); - assertEquals(totalRows, rs.getInt(1)); - } + verifyRowCount(conn, fullTableName, totalRows); } } @@ -400,13 +417,8 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - // Test with preserveOnLimitExceeded=true and autoCommit=false - Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(maxMutationSize, 0))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -444,12 +456,7 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { // Should have required multiple commits assertTrue("Should have committed multiple times", commitCount > 1); - // Verify all rows were inserted - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { - assertTrue(rs.next()); - assertEquals(totalRows, rs.getInt(1)); - } + verifyRowCount(conn, fullTableName, totalRows); } } @@ -467,13 +474,8 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - // Test with preserveOnLimitExceeded=true and autoCommit=true - Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(maxMutationSize, 0))) { conn.setAutoCommit(true); // autoCommit is true PreparedStatement stmt = conn.prepareStatement( @@ -505,12 +507,7 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception // Should have hit limit at least once assertTrue("Should have hit limit at least once", exceptionCount > 0); - // Verify all rows were inserted - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { - assertTrue(rs.next()); - assertEquals(totalRows, rs.getInt(1)); - } + verifyRowCount(conn, fullTableName, totalRows); } } @@ -571,15 +568,9 @@ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - // Test with preserveOnLimitExceeded=true and low byte limit - // Each row is approximately 900+ bytes, so set limit to allow ~3-4 rows - Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000000"); // High row limit - props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "3000"); // Low byte limit (~3-4 rows) - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + // High row limit, low byte limit (~3-4 rows at ~900+ bytes each) + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(1000000, 3000))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -613,12 +604,7 @@ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { // Should have required multiple commits due to byte limit assertTrue("Should have committed multiple times due to byte limit", commitCount > 1); - // Verify all rows were inserted - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT COUNT(*) FROM " + fullTableName)) { - assertTrue(rs.next()); - assertEquals(totalRows, rs.getInt(1)); - } + verifyRowCount(conn, fullTableName, totalRows); } } @@ -699,14 +685,9 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - // Set byte limit low enough that merging rows with increasing sizes will trigger it - Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1500"); - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + // Low byte limit so merging rows with increasing sizes triggers limit + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(1000, 1500))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -775,14 +756,9 @@ public void testConflictingRowsPreserveOnLimitExceeded() throws Exception { conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - // Set byte limit low enough that adding conflicting rows will trigger it - Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "2000"); - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + // Low byte limit so adding conflicting rows triggers limit + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(1000, 2000))) { conn.setAutoCommit(false); // Regular UPSERT statement From 5db555983e97a025dbc45a91a8dcb3d2ea6894c8 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Tue, 10 Feb 2026 10:12:49 +0530 Subject: [PATCH 7/8] Self review, minor improvements to the tests --- .../phoenix/end2end/MutationStateIT.java | 62 ++++++++----------- 1 file changed, 26 insertions(+), 36 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 776b9f2271b..88444bbb74f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -349,9 +349,9 @@ public void testUpsertMaxMutationSize() throws Exception { * allowing the client to commit and retry. */ @Test - public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { + public void testExecuteUpdatePreserveOnRowCountLimitExceeded() throws Exception { String fullTableName = generateUniqueName(); - int maxMutationSize = 5; + int maxRows = 5; // Create table try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -359,7 +359,7 @@ public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { } try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(maxMutationSize, 0))) { + getUrl(), createPreserveModeProps(maxRows, 0))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -397,8 +397,7 @@ public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { } // Should have required multiple commits due to limit - assertTrue("Should have committed multiple times", commitCount > 1); - + assertTrue("Should have committed multiple times", commitCount == 3); verifyRowCount(conn, fullTableName, totalRows); } } @@ -410,7 +409,7 @@ public void testExecuteUpdatePreserveOnLimitExceeded() throws Exception { @Test public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { String fullTableName = generateUniqueName(); - int maxMutationSize = 5; + int maxRows = 5; // Create table try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -418,7 +417,7 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { } try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(maxMutationSize, 0))) { + getUrl(), createPreserveModeProps(maxRows, 0))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -454,8 +453,7 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { } // Should have required multiple commits - assertTrue("Should have committed multiple times", commitCount > 1); - + assertTrue("Should have committed multiple times", commitCount == 3); verifyRowCount(conn, fullTableName, totalRows); } } @@ -467,7 +465,7 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { @Test public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception { String fullTableName = generateUniqueName(); - int maxMutationSize = 5; + int maxRows = 5; // Create table try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -475,7 +473,7 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception } try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(maxMutationSize, 0))) { + getUrl(), createPreserveModeProps(maxRows, 0))) { conn.setAutoCommit(true); // autoCommit is true PreparedStatement stmt = conn.prepareStatement( @@ -506,7 +504,6 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception // Should have hit limit at least once assertTrue("Should have hit limit at least once", exceptionCount > 0); - verifyRowCount(conn, fullTableName, totalRows); } } @@ -518,7 +515,7 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception @Test public void testExecuteUpdateDefaultBehaviorClearsMutations() throws Exception { String fullTableName = generateUniqueName(); - int maxMutationSize = 5; + int maxRows = 5; // Create table try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -527,7 +524,7 @@ public void testExecuteUpdateDefaultBehaviorClearsMutations() throws Exception { // Test with preserveOnLimitExceeded=false (default) Properties props = new Properties(); - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxMutationSize)); + props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxRows)); // Not setting PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, should default to false try (PhoenixConnection conn = @@ -551,8 +548,9 @@ public void testExecuteUpdateDefaultBehaviorClearsMutations() throws Exception { // Verify mutations were cleared (old behavior) MutationState state = conn.getMutationState(); - assertEquals("Mutations should be cleared", 0, state.getNumRows()); + assertEquals("Mutations should have been cleared", 0, state.getNumRows()); } + verifyRowCount(conn, fullTableName, 0); } } @@ -603,7 +601,6 @@ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { // Should have required multiple commits due to byte limit assertTrue("Should have committed multiple times due to byte limit", commitCount > 1); - verifyRowCount(conn, fullTableName, totalRows); } } @@ -631,12 +628,8 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception } // Now use a connection with the limit set - Properties props = new Properties(); - // Set limit to 1 row so any operation producing multiple rows would exceed it - props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1"); - props.setProperty(QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, "true"); - - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( + getUrl(), createPreserveModeProps(1, 0))) { conn.setAutoCommit(false); // Insert a row to have some existing state in the connection's MutationState @@ -651,8 +644,8 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception // Now do a SELECT-based UPSERT that produces multiple rows in a single mutation. // This creates a new MutationState in the constructor that already exceeds the limit. - // Since there's no state to preserve in that NEW MutationState (only 1 row was inserted - // into the connection's existing state), we should get the old exception type + // Since there's no state to preserve in that NEW MutationState (only 1 row was inserted + // into the connection's existing state), we should get the old exception type // (MaxMutationSizeExceededException), not MutationLimitReachedException. try (Statement stmt = conn.createStatement()) { try { @@ -669,6 +662,8 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception "When a single mutation exceeds the limit, there's no state to preserve."); } } + MutationState state = conn.getMutationState(); + assertEquals("Mutation should not have been cleared", 1, state.getNumRows()); } } @@ -698,12 +693,13 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { int totalUpdates = 0; String baseKey = "ORG000000000001"; String entityKey = "ENT000000000001"; + int maxScore = 50; // Keep updating the SAME row with larger values until limit reached // Each update merges with the existing row, increasing the size - while (totalUpdates < 50) { + while (totalUpdates <= maxScore) { try { - for (int i = totalUpdates; i < 50; i++) { + for (int i = totalUpdates; i <= maxScore; i++) { stmt.setString(1, baseKey); stmt.setString(2, entityKey); stmt.setInt(3, i); @@ -730,7 +726,7 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { } // Should have hit limit and committed multiple times due to row merge size growth - assertTrue("Should have committed at least once", commitCount >= 1); + assertTrue("Should have committed multiple times", commitCount > 1); // Verify the row exists with the latest value try (ResultSet rs = conn.createStatement() @@ -738,7 +734,7 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { + " WHERE organization_id = '" + baseKey + "' AND entity_id = '" + entityKey + "'")) { assertTrue("Row should exist", rs.next()); // Score should be the last successfully committed value - assertTrue("Score should be set", rs.getInt(1) >= 0); + assertEquals("Score should be set to the max value", maxScore, rs.getInt(1)); } } } @@ -816,15 +812,9 @@ public void testConflictingRowsPreserveOnLimitExceeded() throws Exception { } // Should have hit limit and committed multiple times due to conflicting row size - assertTrue("Should have committed at least once", commitCount >= 1); - + assertTrue("Should have committed multiple times", commitCount > 1); // Verify rows exist - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT COUNT(*) FROM " + fullTableName - + " WHERE organization_id = '" + orgKey + "'")) { - assertTrue("Should have results", rs.next()); - assertTrue("Should have inserted rows", rs.getInt(1) > 0); - } + verifyRowCount(conn, fullTableName, totalRows); } } From b641adedef90f965d3178b9b479ab55e6c59791a Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Tue, 10 Feb 2026 10:23:16 +0530 Subject: [PATCH 8/8] ran spotless --- .../apache/phoenix/execute/MutationState.java | 19 ++- .../jdbc/MutationLimitBatchException.java | 49 ++++---- .../apache/phoenix/schema/MetaDataClient.java | 6 +- .../schema/MutationLimitReachedException.java | 20 ++-- .../phoenix/end2end/MutationStateIT.java | 110 ++++++++---------- 5 files changed, 93 insertions(+), 111 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index d36eccce7f4..b5c7d8b5513 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -100,8 +100,8 @@ import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; import org.apache.phoenix.schema.MaxMutationSizeExceededException; -import org.apache.phoenix.schema.MutationLimitReachedException; import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.MutationLimitReachedException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; @@ -272,9 +272,8 @@ Maps.> newHashMapWithExpectedSize(5), subT this.serverSideImmutableIndexes = this.connection.getQueryServices().getConfiguration() .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED); - this.preserveOnLimitExceeded = this.connection.getQueryServices().getProps() - .getBoolean(PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, - DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED); + this.preserveOnLimitExceeded = this.connection.getQueryServices().getProps().getBoolean( + PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED); } public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, @@ -612,8 +611,8 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) { // Check if we can merge existing column values with new column values. // For preserve mode, pass this instance so join() can check limits before modification. - Long sizeDiff = existingRowMutationState.join(newRowMutationState, - preserveOnLimitExceeded ? this : null); + Long sizeDiff = + existingRowMutationState.join(newRowMutationState, preserveOnLimitExceeded ? this : null); if (sizeDiff != null) { // Merged successfully (row count unchanged - same row key) estimatedSize += sizeDiff; @@ -2447,7 +2446,7 @@ int[] getStatementIndexes() { * Join the newRow with the current row if it doesn't conflict with it. A regular upsert * conflicts with a conditional upsert. * @param mutationState if non-null, checks limits before modification and throws - * MutationLimitReachedException if size increase would exceed limits + * MutationLimitReachedException if size increase would exceed limits * @return the size change (can be 0, positive, or negative) if merged, or null if conflicting */ Long join(RowMutationState newRow, MutationState mutationState) throws SQLException { @@ -2474,9 +2473,9 @@ Long join(RowMutationState newRow, MutationState mutationState) throws SQLExcept // Total size change (can be negative) long totalSizeDiff = colValuesSizeDiff - + ((combinedOnDupKey != null ? combinedOnDupKey.length : 0) - - (this.onDupKeyBytes != null ? this.onDupKeyBytes.length : 0)) - + (mergedIndexes.length - statementIndexes.length) * SizedUtil.INT_SIZE; + + ((combinedOnDupKey != null ? combinedOnDupKey.length : 0) + - (this.onDupKeyBytes != null ? this.onDupKeyBytes.length : 0)) + + (mergedIndexes.length - statementIndexes.length) * SizedUtil.INT_SIZE; // Check limit BEFORE any modification (row count unchanged for merge - same row key) if (mutationState != null) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java index cf1ff620037..f532d109b09 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/MutationLimitBatchException.java @@ -21,37 +21,30 @@ import org.apache.phoenix.schema.MutationLimitReachedException; /** - * Thrown from executeBatch() when the mutation buffer limit is reached. - * The batch is automatically trimmed to contain only unprocessed items. + * Thrown from executeBatch() when the mutation buffer limit is reached. The batch is automatically + * trimmed to contain only unprocessed items. */ public class MutationLimitBatchException extends BatchUpdateException { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - private final int processedCount; + private final int processedCount; - /** - * @param updateCounts array of update counts for each statement in the batch - * @param cause the underlying MutationLimitReachedException - * @param processedCount number of statements successfully processed - */ - public MutationLimitBatchException( - int[] updateCounts, - MutationLimitReachedException cause, - int processedCount) { - super(cause.getMessage(), - cause.getSQLState(), - cause.getErrorCode(), - updateCounts, - cause); - this.processedCount = processedCount; - } + /** + * @param updateCounts array of update counts for each statement in the batch + * @param cause the underlying MutationLimitReachedException + * @param processedCount number of statements successfully processed + */ + public MutationLimitBatchException(int[] updateCounts, MutationLimitReachedException cause, + int processedCount) { + super(cause.getMessage(), cause.getSQLState(), cause.getErrorCode(), updateCounts, cause); + this.processedCount = processedCount; + } - /** - * Returns the number of statements that were successfully processed - * before the limit was reached. The batch has been trimmed to contain - * only the remaining unprocessed items. - */ - public int getProcessedCount() { - return processedCount; - } + /** + * Returns the number of statements that were successfully processed before the limit was reached. + * The batch has been trimmed to contain only the remaining unprocessed items. + */ + public int getProcessedCount() { + return processedCount; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 1b8e898bd3a..c2a46ca76d1 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -4866,9 +4866,9 @@ public MutationState addColumn(PTable table, List origColumnDefs, /** * To check if TTL is defined at any of the child below we are checking it at * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)} - * level where in function - * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)} - * we are already traversing through allDescendantViews. + * level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# + * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], + * byte[], List, int)} we are already traversing through allDescendantViews. */ } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java index ac177160c04..d6d750d7745 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MutationLimitReachedException.java @@ -22,18 +22,16 @@ import org.apache.phoenix.exception.SQLExceptionInfo; /** - * Exception thrown when a mutation operation would exceed the configured - * mutation buffer size limit. Unlike MaxMutationSizeBytesExceededException, - * existing buffered mutations are preserved and can be committed before retrying. + * Exception thrown when a mutation operation would exceed the configured mutation buffer size + * limit. Unlike MaxMutationSizeBytesExceededException, existing buffered mutations are preserved + * and can be committed before retrying. */ public class MutationLimitReachedException extends SQLException { - private static final long serialVersionUID = 1L; - private static final SQLExceptionCode CODE = - SQLExceptionCode.MUTATION_LIMIT_REACHED; + private static final long serialVersionUID = 1L; + private static final SQLExceptionCode CODE = SQLExceptionCode.MUTATION_LIMIT_REACHED; - public MutationLimitReachedException() { - super(new SQLExceptionInfo.Builder(CODE).build().toString(), - CODE.getSQLState(), - CODE.getErrorCode()); - } + public MutationLimitReachedException() { + super(new SQLExceptionInfo.Builder(CODE).build().toString(), CODE.getSQLState(), + CODE.getErrorCode()); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index 88444bbb74f..0b470acd144 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -86,7 +86,7 @@ private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQL /** * Helper to create Properties for preserve mode tests. - * @param maxRows max mutation row count limit + * @param maxRows max mutation row count limit * @param maxBytes max mutation byte size limit (0 to skip setting) */ private Properties createPreserveModeProps(int maxRows, long maxBytes) { @@ -102,10 +102,8 @@ private Properties createPreserveModeProps(int maxRows, long maxBytes) { /** * Helper to verify the row count in a table. */ - private void verifyRowCount(Connection conn, String tableName, int expected) - throws SQLException { - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT COUNT(*) FROM " + tableName)) { + private void verifyRowCount(Connection conn, String tableName, int expected) throws SQLException { + try (ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName)) { assertTrue("Should have results", rs.next()); assertEquals("Row count mismatch", expected, rs.getInt(1)); } @@ -345,8 +343,8 @@ public void testUpsertMaxMutationSize() throws Exception { /** * Tests that when preserveOnLimitExceeded=true, executeUpdate() throws - * MutationLimitReachedException without clearing buffered mutations, - * allowing the client to commit and retry. + * MutationLimitReachedException without clearing buffered mutations, allowing the client to + * commit and retry. */ @Test public void testExecuteUpdatePreserveOnRowCountLimitExceeded() throws Exception { @@ -358,8 +356,8 @@ public void testExecuteUpdatePreserveOnRowCountLimitExceeded() throws Exception conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(maxRows, 0))) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), + createPreserveModeProps(maxRows, 0))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -403,8 +401,8 @@ public void testExecuteUpdatePreserveOnRowCountLimitExceeded() throws Exception } /** - * Tests that when preserveOnLimitExceeded=true, executeBatch() throws - * MutationLimitBatchException with proper checkpoint info, allowing recovery. + * Tests that when preserveOnLimitExceeded=true, executeBatch() throws MutationLimitBatchException + * with proper checkpoint info, allowing recovery. */ @Test public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { @@ -416,8 +414,8 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(maxRows, 0))) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), + createPreserveModeProps(maxRows, 0))) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -459,8 +457,8 @@ public void testExecuteBatchPreserveOnLimitExceeded() throws Exception { } /** - * Tests executeBatch() with autoCommit=true and preserveOnLimitExceeded=true. - * In this case, executeBatch() should auto-commit on limit and trim batch. + * Tests executeBatch() with autoCommit=true and preserveOnLimitExceeded=true. In this case, + * executeBatch() should auto-commit on limit and trim batch. */ @Test public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception { @@ -472,8 +470,8 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception conn.createStatement().execute("CREATE TABLE " + fullTableName + DDL); } - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(maxRows, 0))) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), + createPreserveModeProps(maxRows, 0))) { conn.setAutoCommit(true); // autoCommit is true PreparedStatement stmt = conn.prepareStatement( @@ -509,8 +507,8 @@ public void testExecuteBatchAutoCommitPreserveOnLimitExceeded() throws Exception } /** - * Tests that when preserveOnLimitExceeded=false (default), the old behavior - * is maintained - mutations are cleared on limit exceeded. + * Tests that when preserveOnLimitExceeded=false (default), the old behavior is maintained - + * mutations are cleared on limit exceeded. */ @Test public void testExecuteUpdateDefaultBehaviorClearsMutations() throws Exception { @@ -527,8 +525,8 @@ public void testExecuteUpdateDefaultBehaviorClearsMutations() throws Exception { props.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, String.valueOf(maxRows)); // Not setting PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, should default to false - try (PhoenixConnection conn = - (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { + try ( + PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); PreparedStatement stmt = conn.prepareStatement( @@ -567,12 +565,12 @@ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { } // High row limit, low byte limit (~3-4 rows at ~900+ bytes each) - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(1000000, 3000))) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), + createPreserveModeProps(1000000, 3000))) { conn.setAutoCommit(false); - PreparedStatement stmt = conn.prepareStatement( - "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); int totalRows = 10; int processed = 0; @@ -606,9 +604,9 @@ public void testExecuteUpdatePreserveOnByteLimitExceeded() throws Exception { } /** - * Tests that when preserveOnLimitExceeded is true but a single mutation exceeds - * the limit (constructor case), we still get the old exception type since there's - * no existing state to preserve. + * Tests that when preserveOnLimitExceeded is true but a single mutation exceeds the limit + * (constructor case), we still get the old exception type since there's no existing state to + * preserve. */ @Test public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception { @@ -628,14 +626,13 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception } // Now use a connection with the limit set - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(1, 0))) { + try (PhoenixConnection conn = + (PhoenixConnection) DriverManager.getConnection(getUrl(), createPreserveModeProps(1, 0))) { conn.setAutoCommit(false); // Insert a row to have some existing state in the connection's MutationState try (PreparedStatement stmt = conn.prepareStatement( - "UPSERT INTO " + fullTableName + - " (organization_id, entity_id, score) VALUES (?, ?, ?)")) { + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) VALUES (?, ?, ?)")) { stmt.setString(1, "org1"); stmt.setString(2, "entity1"); stmt.setInt(3, 1); @@ -649,17 +646,16 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception // (MaxMutationSizeExceededException), not MutationLimitReachedException. try (Statement stmt = conn.createStatement()) { try { - stmt.executeUpdate( - "UPSERT INTO " + fullTableName + - " (organization_id, entity_id, score) " + - "SELECT id, 'e', val FROM " + sourceTable); + stmt.executeUpdate("UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score) " + "SELECT id, 'e', val FROM " + sourceTable); fail("Expected MaxMutationSizeExceededException"); } catch (MaxMutationSizeExceededException e) { // Expected - legacy exception even though preserveOnLimitExceeded is true, // because the single mutation (from constructor) has no prior state to preserve } catch (MutationLimitReachedException e) { - fail("Should have thrown MaxMutationSizeExceededException, not MutationLimitReachedException. " + - "When a single mutation exceeds the limit, there's no state to preserve."); + fail( + "Should have thrown MaxMutationSizeExceededException, not MutationLimitReachedException. " + + "When a single mutation exceeds the limit, there's no state to preserve."); } } MutationState state = conn.getMutationState(); @@ -668,9 +664,9 @@ public void testSingleMutationExceedsLimitWithPreserveOption() throws Exception } /** - * Tests that when preserveOnLimitExceeded=true and the limit is reached during - * row merge (same row key updated multiple times), the mutations are preserved. - * This specifically tests the 3rd case in joinMutationState where rows are merged. + * Tests that when preserveOnLimitExceeded=true and the limit is reached during row merge (same + * row key updated multiple times), the mutations are preserved. This specifically tests the 3rd + * case in joinMutationState where rows are merged. */ @Test public void testRowMergePreserveOnLimitExceeded() throws Exception { @@ -681,13 +677,12 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { } // Low byte limit so merging rows with increasing sizes triggers limit - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(1000, 1500))) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), + createPreserveModeProps(1000, 1500))) { conn.setAutoCommit(false); - PreparedStatement stmt = conn.prepareStatement( - "UPSERT INTO " + fullTableName - + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); int commitCount = 0; int totalUpdates = 0; @@ -729,9 +724,8 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { assertTrue("Should have committed multiple times", commitCount > 1); // Verify the row exists with the latest value - try (ResultSet rs = conn.createStatement() - .executeQuery("SELECT score FROM " + fullTableName - + " WHERE organization_id = '" + baseKey + "' AND entity_id = '" + entityKey + "'")) { + try (ResultSet rs = conn.createStatement().executeQuery("SELECT score FROM " + fullTableName + + " WHERE organization_id = '" + baseKey + "' AND entity_id = '" + entityKey + "'")) { assertTrue("Row should exist", rs.next()); // Score should be the last successfully committed value assertEquals("Score should be set to the max value", maxScore, rs.getInt(1)); @@ -740,9 +734,9 @@ public void testRowMergePreserveOnLimitExceeded() throws Exception { } /** - * Tests that when preserveOnLimitExceeded=true and the limit is reached during - * conflicting row addition (regular upsert vs ON DUPLICATE KEY upsert on same key), - * the mutations are preserved. This tests the conflict case in joinMutationState. + * Tests that when preserveOnLimitExceeded=true and the limit is reached during conflicting row + * addition (regular upsert vs ON DUPLICATE KEY upsert on same key), the mutations are preserved. + * This tests the conflict case in joinMutationState. */ @Test public void testConflictingRowsPreserveOnLimitExceeded() throws Exception { @@ -753,19 +747,17 @@ public void testConflictingRowsPreserveOnLimitExceeded() throws Exception { } // Low byte limit so adding conflicting rows triggers limit - try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection( - getUrl(), createPreserveModeProps(1000, 2000))) { + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl(), + createPreserveModeProps(1000, 2000))) { conn.setAutoCommit(false); // Regular UPSERT statement - PreparedStatement regularStmt = conn.prepareStatement( - "UPSERT INTO " + fullTableName - + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); + PreparedStatement regularStmt = conn.prepareStatement("UPSERT INTO " + fullTableName + + " (organization_id, entity_id, score, tags) VALUES (?,?,?,?)"); // ON DUPLICATE KEY UPSERT statement - conflicts with regular upsert PreparedStatement onDupKeyStmt = conn.prepareStatement( - "UPSERT INTO " + fullTableName - + " (organization_id, entity_id, score) VALUES (?,?,?)" + "UPSERT INTO " + fullTableName + " (organization_id, entity_id, score) VALUES (?,?,?)" + " ON DUPLICATE KEY UPDATE score = score + 1"); int commitCount = 0;