From 2b32a38eeadedac9adadeb3ea1ffa95ad0a68558 Mon Sep 17 00:00:00 2001 From: Sh-Zh-7 Date: Sat, 21 Feb 2026 14:25:45 +0800 Subject: [PATCH 1/2] Support rank function in TopK optimization. --- .../it/db/it/IoTDBWindowFunction3IT.java | 80 +- .../operator/GroupedTopNRankAccumulator.java | 735 ++++++++++++++++++ .../operator/GroupedTopNRankBuilder.java | 219 ++++++ .../operator/RowIdComparisonHashStrategy.java | 8 + ...impleTsBlockWithPositionEqualsAndHash.java | 129 +++ .../operator/TopNPeerGroupLookup.java | 382 +++++++++ .../TsBlockWithPositionEqualsAndHash.java | 28 + .../process/window/TopKRankingOperator.java | 28 +- .../optimizations/SortElimination.java | 22 + .../TransformSortToStreamSort.java | 14 + .../window/TopKRankingOperatorTest.java | 696 +++++++++++++++++ .../WindowFunctionOptimizationTest.java | 4 +- 12 files changed, 2332 insertions(+), 13 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java index d461f3a11fee6..52abba3629380 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java @@ -121,8 +121,8 @@ public void testPushDownFilterIntoWindow() { String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; String[] retArray = new String[] { - "2021-01-01T09:10:00.000Z,d1,1.0,1,", "2021-01-01T09:05:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", "2021-01-01T09:08:00.000Z,d2,2.0,1,", "2021-01-01T09:15:00.000Z,d2,4.0,2,", }; @@ -166,6 +166,84 @@ public void testReplaceWindowWithRowNumber() { DATABASE_NAME); } + @Test + public void testPushDownFilterIntoWindowWithRank() { + // Data: d1 values {3,5,3,1}, d2 values {2,4} + // rank(PARTITION BY device ORDER BY value): + // d1: 1.0→rank=1, 3.0→rank=2, 3.0→rank=2, 5.0→rank=4 + // d2: 2.0→rank=1, 4.0→rank=2 + // WHERE rk <= 2: keeps d1 rows with rank≤2 (3 rows due to tie), d2 all (2 rows) + String[] expectedHeader = new String[] {"time", "device", "value", "rk"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,2,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM demo) WHERE rk <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testPushDownLimitIntoWindowWithRank() { + // TopKRanking(RANK, topN=2) keeps rank≤2 per partition, then LIMIT 2 on final result + // d1 rank≤2: 1.0(r=1), 3.0(r=2), 3.0(r=2) → 3 rows sorted by time: 09:05,09:09,09:10 + // d2 rank≤2: 2.0(r=1), 4.0(r=2) → 2 rows + // ORDER BY device, time LIMIT 2 → first 2 from d1 + String[] expectedHeader = new String[] {"time", "device", "value", "rk"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,2,", "2021-01-01T09:07:00.000Z,d1,5.0,4,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM demo) ORDER BY device, time LIMIT 2", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testRankBasic() { + // Verifies rank computation: ties get the same rank, gaps after ties + String[] expectedHeader = new String[] {"time", "device", "value", "rk"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,2,", + "2021-01-01T09:07:00.000Z,d1,5.0,4,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + }; + tableResultSetEqualTest( + "SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM demo ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testRankWithFilterEquals() { + // WHERE rk = 2 keeps only rows with rank exactly 2 (both d1 rows with value=3) + String[] expectedHeader = new String[] {"time", "device", "value", "rk"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,2,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM demo) WHERE rk = 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + @Test public void testRemoveRedundantWindow() { String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java new file mode 100644 index 0000000000000..a0830657eff7b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java @@ -0,0 +1,735 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue; +import org.apache.iotdb.db.utils.HeapTraversal; + +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.function.LongConsumer; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class GroupedTopNRankAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRankAccumulator.class); + private static final long UNKNOWN_INDEX = -1; + private static final long NULL_GROUP_ID = -1; + + private final GroupIdToHeapBuffer groupIdToHeapBuffer = new GroupIdToHeapBuffer(); + private final HeapNodeBuffer heapNodeBuffer = new HeapNodeBuffer(); + private final PeerGroupBuffer peerGroupBuffer = new PeerGroupBuffer(); + private final HeapTraversal heapTraversal = new HeapTraversal(); + + // Map from (Group ID, Row Value) to Heap Node Index where the value is stored + private final TopNPeerGroupLookup peerGroupLookup; + + private final RowIdComparisonHashStrategy strategy; + private final int topN; + private final LongConsumer rowIdEvictionListener; + + public GroupedTopNRankAccumulator( + RowIdComparisonHashStrategy strategy, int topN, LongConsumer rowIdEvictionListener) { + this.strategy = requireNonNull(strategy, "strategy is null"); + this.peerGroupLookup = new TopNPeerGroupLookup(10_000, strategy, NULL_GROUP_ID, UNKNOWN_INDEX); + checkArgument(topN > 0, "topN must be greater than zero"); + this.topN = topN; + this.rowIdEvictionListener = + requireNonNull(rowIdEvictionListener, "rowIdEvictionListener is null"); + } + + public long sizeOf() { + return INSTANCE_SIZE + + groupIdToHeapBuffer.sizeOf() + + heapNodeBuffer.sizeOf() + + peerGroupBuffer.sizeOf() + + heapTraversal.sizeOf() + + peerGroupLookup.sizeOf(); + } + + public int findFirstPositionToAdd( + TsBlock newPage, + int groupCount, + int[] groupIds, + TsBlockWithPositionComparator comparator, + RowReferenceTsBlockManager pageManager) { + int currentGroups = groupIdToHeapBuffer.getTotalGroups(); + groupIdToHeapBuffer.allocateGroupIfNeeded(groupCount); + + for (int position = 0; position < newPage.getPositionCount(); position++) { + int groupId = groupIds[position]; + if (groupId >= currentGroups || groupIdToHeapBuffer.getHeapValueCount(groupId) < topN) { + return position; + } + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (heapRootNodeIndex == UNKNOWN_INDEX) { + return position; + } + long rightPageRowId = peekRootRowIdByHeapNodeIndex(heapRootNodeIndex); + TsBlock rightPage = pageManager.getTsBlock(rightPageRowId); + int rightPosition = pageManager.getPosition(rightPageRowId); + // If the current position is equal to or less than the current heap root index, then we may + // need to insert it + if (comparator.compareTo(newPage, position, rightPage, rightPosition) <= 0) { + return position; + } + } + return -1; + } + + /** + * Add the specified row to this accumulator. + * + *

This may trigger row eviction callbacks if other rows have to be evicted to make space. + * + * @return true if this row was incorporated, false otherwise + */ + public boolean add(int groupId, RowReference rowReference) { + // Insert to any existing peer groups first (heap nodes contain distinct values) + long peerHeapNodeIndex = peerGroupLookup.get(groupId, rowReference); + if (peerHeapNodeIndex != UNKNOWN_INDEX) { + directPeerGroupInsert(groupId, peerHeapNodeIndex, rowReference.allocateRowId()); + if (calculateRootRank(groupId, groupIdToHeapBuffer.getHeapRootNodeIndex(groupId)) > topN) { + heapPop(groupId, rowIdEvictionListener); + } + // Return true because heapPop is guaranteed not to evict the newly inserted row (by + // definition of rank) + return true; + } + + groupIdToHeapBuffer.allocateGroupIfNeeded(groupId); + if (groupIdToHeapBuffer.getHeapValueCount(groupId) < topN) { + // Always safe to insert if total number of values is still less than topN + long newPeerGroupIndex = + peerGroupBuffer.allocateNewNode(rowReference.allocateRowId(), UNKNOWN_INDEX); + heapInsert(groupId, newPeerGroupIndex, 1); + return true; + } + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (rowReference.compareTo(strategy, peekRootRowIdByHeapNodeIndex(heapRootNodeIndex)) < 0) { + // Given that total number of values >= topN, we can only consider values that are less than + // the root (otherwise topN would be violated) + long newPeerGroupIndex = + peerGroupBuffer.allocateNewNode(rowReference.allocateRowId(), UNKNOWN_INDEX); + // Rank will increase by +1 after insertion, so only need to pop if root rank is already == + // topN. + if (calculateRootRank(groupId, heapRootNodeIndex) < topN) { + heapInsert(groupId, newPeerGroupIndex, 1); + } else { + heapPopAndInsert(groupId, newPeerGroupIndex, 1, rowIdEvictionListener); + } + return true; + } + // Row cannot be accepted because the total number of values >= topN, and the row is greater + // than the root (meaning it's rank would be at least topN+1). + return false; + } + + /** + * Drain the contents of this accumulator to the provided output row ID and ranking buffer. + * + *

Rows will be presented in increasing rank order. Draining will not trigger any row eviction + * callbacks. After this method completion, the Accumulator will contain zero rows for the + * specified groupId. + * + * @return number of rows deposited to the output buffers + */ + public long drainTo(int groupId, LongBigArray rowIdOutput, LongBigArray rankingOutput) { + long valueCount = groupIdToHeapBuffer.getHeapValueCount(groupId); + rowIdOutput.ensureCapacity(valueCount); + rankingOutput.ensureCapacity(valueCount); + + // Heap is inverted to output order, so insert back to front + long insertionIndex = valueCount - 1; + while (insertionIndex >= 0) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + verify(heapRootNodeIndex != UNKNOWN_INDEX); + + long peerGroupIndex = heapNodeBuffer.getPeerGroupIndex(heapRootNodeIndex); + verify(peerGroupIndex != UNKNOWN_INDEX, "Peer group should have at least one value"); + + long rank = calculateRootRank(groupId, heapRootNodeIndex); + do { + rowIdOutput.set(insertionIndex, peerGroupBuffer.getRowId(peerGroupIndex)); + rankingOutput.set(insertionIndex, rank); + insertionIndex--; + peerGroupIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex); + } while (peerGroupIndex != UNKNOWN_INDEX); + + heapPop(groupId, null); + } + return valueCount; + } + + /** + * Drain the contents of this accumulator to the provided output row ID. + * + *

Rows will be presented in increasing rank order. Draining will not trigger any row eviction + * callbacks. After this method completion, the Accumulator will contain zero rows for the + * specified groupId. + * + * @return number of rows deposited to the output buffer + */ + public long drainTo(int groupId, LongBigArray rowIdOutput) { + long valueCount = groupIdToHeapBuffer.getHeapValueCount(groupId); + rowIdOutput.ensureCapacity(valueCount); + + // Heap is inverted to output order, so insert back to front + long insertionIndex = valueCount - 1; + while (insertionIndex >= 0) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + verify(heapRootNodeIndex != UNKNOWN_INDEX); + + long peerGroupIndex = heapNodeBuffer.getPeerGroupIndex(heapRootNodeIndex); + verify(peerGroupIndex != UNKNOWN_INDEX, "Peer group should have at least one value"); + + do { + rowIdOutput.set(insertionIndex, peerGroupBuffer.getRowId(peerGroupIndex)); + insertionIndex--; + peerGroupIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex); + } while (peerGroupIndex != UNKNOWN_INDEX); + + heapPop(groupId, null); + } + return valueCount; + } + + private long calculateRootRank(int groupId, long heapRootIndex) { + long heapValueCount = groupIdToHeapBuffer.getHeapValueCount(groupId); + checkArgument(heapRootIndex != UNKNOWN_INDEX, "Group does not have a root"); + long rootPeerGroupCount = heapNodeBuffer.getPeerGroupCount(heapRootIndex); + return heapValueCount - rootPeerGroupCount + 1; + } + + private void directPeerGroupInsert(int groupId, long heapNodeIndex, long rowId) { + long existingPeerGroupIndex = heapNodeBuffer.getPeerGroupIndex(heapNodeIndex); + long newPeerGroupIndex = peerGroupBuffer.allocateNewNode(rowId, existingPeerGroupIndex); + heapNodeBuffer.setPeerGroupIndex(heapNodeIndex, newPeerGroupIndex); + heapNodeBuffer.incrementPeerGroupCount(heapNodeIndex); + groupIdToHeapBuffer.incrementHeapValueCount(groupId); + } + + private long peekRootRowIdByHeapNodeIndex(long heapRootNodeIndex) { + checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "Group has nothing to peek"); + return peerGroupBuffer.getRowId(heapNodeBuffer.getPeerGroupIndex(heapRootNodeIndex)); + } + + private long getChildIndex(long heapNodeIndex, HeapTraversal.Child child) { + return child == HeapTraversal.Child.LEFT + ? heapNodeBuffer.getLeftChildHeapIndex(heapNodeIndex) + : heapNodeBuffer.getRightChildHeapIndex(heapNodeIndex); + } + + private void setChildIndex(long heapNodeIndex, HeapTraversal.Child child, long newChildIndex) { + if (child == HeapTraversal.Child.LEFT) { + heapNodeBuffer.setLeftChildHeapIndex(heapNodeIndex, newChildIndex); + } else { + heapNodeBuffer.setRightChildHeapIndex(heapNodeIndex, newChildIndex); + } + } + + /** + * Pop the root node off the group ID's max heap. + * + * @param contextEvictionListener optional callback for the root node that gets popped off + */ + private void heapPop(int groupId, LongConsumer contextEvictionListener) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + checkArgument(heapRootNodeIndex != UNKNOWN_INDEX, "Group ID has an empty heap"); + + long lastHeapNodeIndex = heapDetachLastInsertionLeaf(groupId); + long lastPeerGroupIndex = heapNodeBuffer.getPeerGroupIndex(lastHeapNodeIndex); + long lastPeerGroupCount = heapNodeBuffer.getPeerGroupCount(lastHeapNodeIndex); + + if (lastHeapNodeIndex == heapRootNodeIndex) { + // The root is the last node remaining + dropHeapNodePeerGroup(groupId, lastHeapNodeIndex, contextEvictionListener); + } else { + // Pop the root and insert the last peer group back into the heap to ensure a balanced tree + heapPopAndInsert(groupId, lastPeerGroupIndex, lastPeerGroupCount, contextEvictionListener); + } + + // peerGroupLookup entry will be updated by definition of inserting the last peer group into a + // new node + heapNodeBuffer.deallocate(lastHeapNodeIndex); + } + + /** + * Detaches (but does not deallocate) the leaf in the bottom right-most position in the heap. + * + *

Given the fixed insertion order, the bottom right-most leaf will correspond to the last leaf + * node inserted into the balanced heap. + * + * @return leaf node index that was detached from the heap + */ + private long heapDetachLastInsertionLeaf(int groupId) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + long heapSize = groupIdToHeapBuffer.getHeapSize(groupId); + + long previousNodeIndex = UNKNOWN_INDEX; + HeapTraversal.Child childPosition = null; + long currentNodeIndex = heapRootNodeIndex; + + heapTraversal.resetWithPathTo(heapSize); + while (!heapTraversal.isTarget()) { + previousNodeIndex = currentNodeIndex; + childPosition = heapTraversal.nextChild(); + currentNodeIndex = getChildIndex(currentNodeIndex, childPosition); + verify(currentNodeIndex != UNKNOWN_INDEX, "Target node must exist"); + } + + // Detach the last insertion leaf node, but do not deallocate yet + if (previousNodeIndex == UNKNOWN_INDEX) { + // Last insertion leaf was the root node + groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, UNKNOWN_INDEX); + groupIdToHeapBuffer.setHeapValueCount(groupId, 0); + groupIdToHeapBuffer.setHeapSize(groupId, 0); + } else { + setChildIndex(previousNodeIndex, childPosition, UNKNOWN_INDEX); + groupIdToHeapBuffer.addHeapValueCount( + groupId, -heapNodeBuffer.getPeerGroupCount(currentNodeIndex)); + groupIdToHeapBuffer.addHeapSize(groupId, -1); + } + + return currentNodeIndex; + } + + /** + * Inserts a new row into the heap for the specified group ID. + * + *

The technique involves traversing the heap from the root to a new bottom left-priority leaf + * position, potentially swapping heap nodes along the way to find the proper insertion position + * for the new row. Insertions always fill the left child before the right, and fill up an entire + * heap level before moving to the next level. + */ + private void heapInsert(int groupId, long newPeerGroupIndex, long newPeerGroupCount) { + long newCanonicalRowId = peerGroupBuffer.getRowId(newPeerGroupIndex); + + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (heapRootNodeIndex == UNKNOWN_INDEX) { + // Heap is currently empty, so this will be the first node + heapRootNodeIndex = heapNodeBuffer.allocateNewNode(newPeerGroupIndex, newPeerGroupCount); + verify(peerGroupLookup.put(groupId, newCanonicalRowId, heapRootNodeIndex) == UNKNOWN_INDEX); + groupIdToHeapBuffer.setHeapRootNodeIndex(groupId, heapRootNodeIndex); + groupIdToHeapBuffer.setHeapValueCount(groupId, newPeerGroupCount); + groupIdToHeapBuffer.setHeapSize(groupId, 1); + return; + } + + long previousHeapNodeIndex = UNKNOWN_INDEX; + HeapTraversal.Child childPosition = null; + long currentHeapNodeIndex = heapRootNodeIndex; + boolean swapped = false; + + groupIdToHeapBuffer.addHeapValueCount(groupId, newPeerGroupCount); + groupIdToHeapBuffer.incrementHeapSize(groupId); + heapTraversal.resetWithPathTo(groupIdToHeapBuffer.getHeapSize(groupId)); + while (!heapTraversal.isTarget()) { + long peerGroupIndex = heapNodeBuffer.getPeerGroupIndex(currentHeapNodeIndex); + long currentCanonicalRowId = peerGroupBuffer.getRowId(peerGroupIndex); + // We can short-circuit the check if a parent has already been swapped because the new row to + // insert must + // be greater than all of it's children. + if (swapped || strategy.compare(newCanonicalRowId, currentCanonicalRowId) > 0) { + long peerGroupCount = heapNodeBuffer.getPeerGroupCount(currentHeapNodeIndex); + + // Swap the peer groups + heapNodeBuffer.setPeerGroupIndex(currentHeapNodeIndex, newPeerGroupIndex); + heapNodeBuffer.setPeerGroupCount(currentHeapNodeIndex, newPeerGroupCount); + peerGroupLookup.put(groupId, newCanonicalRowId, currentHeapNodeIndex); + + newPeerGroupIndex = peerGroupIndex; + newPeerGroupCount = peerGroupCount; + newCanonicalRowId = currentCanonicalRowId; + swapped = true; + } + + previousHeapNodeIndex = currentHeapNodeIndex; + childPosition = heapTraversal.nextChild(); + currentHeapNodeIndex = getChildIndex(currentHeapNodeIndex, childPosition); + } + + verify( + previousHeapNodeIndex != UNKNOWN_INDEX && childPosition != null, + "heap must have at least one node before starting traversal"); + verify(currentHeapNodeIndex == UNKNOWN_INDEX, "New child shouldn't exist yet"); + + long newHeapNodeIndex = heapNodeBuffer.allocateNewNode(newPeerGroupIndex, newPeerGroupCount); + peerGroupLookup.put(groupId, newCanonicalRowId, newHeapNodeIndex); + + // Link the new child to the parent + setChildIndex(previousHeapNodeIndex, childPosition, newHeapNodeIndex); + } + + /** + * Pop the root off the group ID's max heap and insert the new peer group. + * + *

These two operations are more efficient if performed together. The technique involves + * swapping the new row into the root position, and applying a heap down bubbling operation to + * heap-ify. + * + * @param contextEvictionListener optional callback for the root node that gets popped off + */ + private void heapPopAndInsert( + int groupId, + long newPeerGroupIndex, + long newPeerGroupCount, + LongConsumer contextEvictionListener) { + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + checkState(heapRootNodeIndex != UNKNOWN_INDEX, "popAndInsert() requires at least a root node"); + + // Clear contents of the root node to create a vacancy for the new peer group + groupIdToHeapBuffer.addHeapValueCount( + groupId, newPeerGroupCount - heapNodeBuffer.getPeerGroupCount(heapRootNodeIndex)); + dropHeapNodePeerGroup(groupId, heapRootNodeIndex, contextEvictionListener); + + long newCanonicalRowId = peerGroupBuffer.getRowId(newPeerGroupIndex); + + long currentNodeIndex = heapRootNodeIndex; + while (true) { + long maxChildNodeIndex = heapNodeBuffer.getLeftChildHeapIndex(currentNodeIndex); + if (maxChildNodeIndex == UNKNOWN_INDEX) { + // Left is always inserted before right, so a missing left child means there can't be a + // right child, + // which means this must already be a leaf position. + break; + } + long maxChildPeerGroupIndex = heapNodeBuffer.getPeerGroupIndex(maxChildNodeIndex); + long maxChildCanonicalRowId = peerGroupBuffer.getRowId(maxChildPeerGroupIndex); + + long rightChildNodeIndex = heapNodeBuffer.getRightChildHeapIndex(currentNodeIndex); + if (rightChildNodeIndex != UNKNOWN_INDEX) { + long rightChildPeerGroupIndex = heapNodeBuffer.getPeerGroupIndex(rightChildNodeIndex); + long rightChildCanonicalRowId = peerGroupBuffer.getRowId(rightChildPeerGroupIndex); + if (strategy.compare(rightChildCanonicalRowId, maxChildCanonicalRowId) > 0) { + maxChildNodeIndex = rightChildNodeIndex; + maxChildPeerGroupIndex = rightChildPeerGroupIndex; + maxChildCanonicalRowId = rightChildCanonicalRowId; + } + } + + if (strategy.compare(newCanonicalRowId, maxChildCanonicalRowId) >= 0) { + // New row is greater than or equal to both children, so the heap invariant is satisfied by + // inserting the + // new row at this position + break; + } + + // Swap the max child row value into the current node + heapNodeBuffer.setPeerGroupIndex(currentNodeIndex, maxChildPeerGroupIndex); + heapNodeBuffer.setPeerGroupCount( + currentNodeIndex, heapNodeBuffer.getPeerGroupCount(maxChildNodeIndex)); + peerGroupLookup.put(groupId, maxChildCanonicalRowId, currentNodeIndex); + + // Max child now has an unfilled vacancy, so continue processing with that as the current node + currentNodeIndex = maxChildNodeIndex; + } + + heapNodeBuffer.setPeerGroupIndex(currentNodeIndex, newPeerGroupIndex); + heapNodeBuffer.setPeerGroupCount(currentNodeIndex, newPeerGroupCount); + peerGroupLookup.put(groupId, newCanonicalRowId, currentNodeIndex); + } + + /** + * Deallocates all peer group associations for this heap node, leaving a structural husk with no + * contents. Assumes that any required group level metric changes are handled externally. + */ + private void dropHeapNodePeerGroup( + int groupId, long heapNodeIndex, LongConsumer contextEvictionListener) { + long peerGroupIndex = heapNodeBuffer.getPeerGroupIndex(heapNodeIndex); + checkState(peerGroupIndex != UNKNOWN_INDEX, "Heap node must have at least one peer group"); + + long rowId = peerGroupBuffer.getRowId(peerGroupIndex); + long nextPeerIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex); + peerGroupBuffer.deallocate(peerGroupIndex); + verify(peerGroupLookup.remove(groupId, rowId) == heapNodeIndex); + + if (contextEvictionListener != null) { + contextEvictionListener.accept(rowId); + } + + peerGroupIndex = nextPeerIndex; + + while (peerGroupIndex != UNKNOWN_INDEX) { + rowId = peerGroupBuffer.getRowId(peerGroupIndex); + nextPeerIndex = peerGroupBuffer.getNextPeerIndex(peerGroupIndex); + peerGroupBuffer.deallocate(peerGroupIndex); + + if (contextEvictionListener != null) { + contextEvictionListener.accept(rowId); + } + + peerGroupIndex = nextPeerIndex; + } + } + + /** + * Buffer abstracting a mapping from group ID to a heap. The group ID provides the index for all + * operations. + */ + private static final class GroupIdToHeapBuffer { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupIdToHeapBuffer.class); + private static final int METRICS_POSITIONS_PER_ENTRY = 2; + private static final int METRICS_HEAP_SIZE_OFFSET = 1; + + /* + * Memory layout: + * [LONG] heapNodeIndex1, + * [LONG] heapNodeIndex2, + * ... + */ + // Since we have a single element per group, this array is effectively indexed on group ID + private final LongBigArray heapIndexBuffer = new LongBigArray(UNKNOWN_INDEX); + + /* + * Memory layout: + * [LONG] valueCount1, [LONG] heapSize1, + * [LONG] valueCount2, [LONG] heapSize2, + * ... + */ + private final LongBigArray metricsBuffer = new LongBigArray(0); + + private int totalGroups; + + public void allocateGroupIfNeeded(int groupId) { + if (totalGroups > groupId) { + return; + } + // Group IDs generated by GroupByHash are always generated consecutively starting from 0, so + // observing a + // group ID N means groups [0, N] inclusive must exist. + totalGroups = groupId + 1; + heapIndexBuffer.ensureCapacity(totalGroups); + metricsBuffer.ensureCapacity((long) totalGroups * METRICS_POSITIONS_PER_ENTRY); + } + + public int getTotalGroups() { + return totalGroups; + } + + public long getHeapRootNodeIndex(int groupId) { + return heapIndexBuffer.get(groupId); + } + + public void setHeapRootNodeIndex(int groupId, long heapNodeIndex) { + heapIndexBuffer.set(groupId, heapNodeIndex); + } + + public long getHeapValueCount(int groupId) { + return metricsBuffer.get((long) groupId * METRICS_POSITIONS_PER_ENTRY); + } + + public void setHeapValueCount(int groupId, long count) { + metricsBuffer.set((long) groupId * METRICS_POSITIONS_PER_ENTRY, count); + } + + public void addHeapValueCount(int groupId, long delta) { + metricsBuffer.add((long) groupId * METRICS_POSITIONS_PER_ENTRY, delta); + } + + public void incrementHeapValueCount(int groupId) { + metricsBuffer.increment((long) groupId * METRICS_POSITIONS_PER_ENTRY); + } + + public long getHeapSize(int groupId) { + return metricsBuffer.get( + (long) groupId * METRICS_POSITIONS_PER_ENTRY + METRICS_HEAP_SIZE_OFFSET); + } + + public void setHeapSize(int groupId, long size) { + metricsBuffer.set( + (long) groupId * METRICS_POSITIONS_PER_ENTRY + METRICS_HEAP_SIZE_OFFSET, size); + } + + public void addHeapSize(int groupId, long delta) { + metricsBuffer.add( + (long) groupId * METRICS_POSITIONS_PER_ENTRY + METRICS_HEAP_SIZE_OFFSET, delta); + } + + public void incrementHeapSize(int groupId) { + metricsBuffer.increment( + (long) groupId * METRICS_POSITIONS_PER_ENTRY + METRICS_HEAP_SIZE_OFFSET); + } + + public long sizeOf() { + return INSTANCE_SIZE + heapIndexBuffer.sizeOf() + metricsBuffer.sizeOf(); + } + } + + /** + * Buffer abstracting storage of nodes in the heap. Nodes are referenced by their node index for + * operations. + */ + private static final class HeapNodeBuffer { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(HeapNodeBuffer.class); + private static final int POSITIONS_PER_ENTRY = 4; + private static final int PEER_GROUP_COUNT_OFFSET = 1; + private static final int LEFT_CHILD_HEAP_INDEX_OFFSET = 2; + private static final int RIGHT_CHILD_HEAP_INDEX_OFFSET = 3; + + /* + * Memory layout: + * [LONG] peerGroupIndex1, [LONG] peerGroupCount1, [LONG] leftChildNodeIndex1, [LONG] rightChildNodeIndex1, + * [LONG] peerGroupIndex2, [LONG] peerGroupCount2, [LONG] leftChildNodeIndex2, [LONG] rightChildNodeIndex2, + * ... + */ + private final LongBigArray buffer = new LongBigArray(); + + private final LongBigArrayFIFOQueue emptySlots = new LongBigArrayFIFOQueue(); + + private long capacity; + + /** + * Allocates storage for a new heap node. + * + * @return index referencing the node + */ + public long allocateNewNode(long peerGroupIndex, long peerGroupCount) { + long newHeapIndex; + if (!emptySlots.isEmpty()) { + newHeapIndex = emptySlots.dequeueLong(); + } else { + newHeapIndex = capacity; + capacity++; + buffer.ensureCapacity(capacity * POSITIONS_PER_ENTRY); + } + + setPeerGroupIndex(newHeapIndex, peerGroupIndex); + setPeerGroupCount(newHeapIndex, peerGroupCount); + setLeftChildHeapIndex(newHeapIndex, UNKNOWN_INDEX); + setRightChildHeapIndex(newHeapIndex, UNKNOWN_INDEX); + + return newHeapIndex; + } + + public void deallocate(long index) { + emptySlots.enqueue(index); + } + + public long getActiveNodeCount() { + return capacity - emptySlots.longSize(); + } + + public long getPeerGroupIndex(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY); + } + + public void setPeerGroupIndex(long index, long peerGroupIndex) { + buffer.set(index * POSITIONS_PER_ENTRY, peerGroupIndex); + } + + public long getPeerGroupCount(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET); + } + + public void setPeerGroupCount(long index, long peerGroupCount) { + buffer.set(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET, peerGroupCount); + } + + public void incrementPeerGroupCount(long index) { + buffer.increment(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET); + } + + public void addPeerGroupCount(long index, long delta) { + buffer.add(index * POSITIONS_PER_ENTRY + PEER_GROUP_COUNT_OFFSET, delta); + } + + public long getLeftChildHeapIndex(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + LEFT_CHILD_HEAP_INDEX_OFFSET); + } + + public void setLeftChildHeapIndex(long index, long childHeapIndex) { + buffer.set(index * POSITIONS_PER_ENTRY + LEFT_CHILD_HEAP_INDEX_OFFSET, childHeapIndex); + } + + public long getRightChildHeapIndex(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + RIGHT_CHILD_HEAP_INDEX_OFFSET); + } + + public void setRightChildHeapIndex(long index, long childHeapIndex) { + buffer.set(index * POSITIONS_PER_ENTRY + RIGHT_CHILD_HEAP_INDEX_OFFSET, childHeapIndex); + } + + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf(); + } + } + + /** + * Buffer abstracting storage of peer groups as linked chains of matching values. Peer groups are + * referenced by their node index for operations. + */ + private static final class PeerGroupBuffer { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PeerGroupBuffer.class); + private static final int POSITIONS_PER_ENTRY = 2; + private static final int NEXT_PEER_INDEX_OFFSET = 1; + + /* + * Memory layout: + * [LONG] rowId1, [LONG] nextPeerIndex1, + * [LONG] rowId2, [LONG] nextPeerIndex2, + * ... + */ + private final LongBigArray buffer = new LongBigArray(); + + private final LongBigArrayFIFOQueue emptySlots = new LongBigArrayFIFOQueue(); + + private long capacity; + + /** + * Allocates storage for a new peer group node. + * + * @return index referencing the node + */ + public long allocateNewNode(long rowId, long nextPeerIndex) { + long newPeerIndex; + if (!emptySlots.isEmpty()) { + newPeerIndex = emptySlots.dequeueLong(); + } else { + newPeerIndex = capacity; + capacity++; + buffer.ensureCapacity(capacity * POSITIONS_PER_ENTRY); + } + + setRowId(newPeerIndex, rowId); + setNextPeerIndex(newPeerIndex, nextPeerIndex); + + return newPeerIndex; + } + + public void deallocate(long index) { + emptySlots.enqueue(index); + } + + public long getActiveNodeCount() { + return capacity - emptySlots.longSize(); + } + + public long getRowId(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY); + } + + public void setRowId(long index, long rowId) { + buffer.set(index * POSITIONS_PER_ENTRY, rowId); + } + + public long getNextPeerIndex(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + NEXT_PEER_INDEX_OFFSET); + } + + public void setNextPeerIndex(long index, long nextPeerIndex) { + buffer.set(index * POSITIONS_PER_ENTRY + NEXT_PEER_INDEX_OFFSET, nextPeerIndex); + } + + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf() + emptySlots.sizeOf(); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java new file mode 100644 index 0000000000000..dcfc8dbcee14b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankBuilder.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Iterator; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; + +/** + * Finds the top N rows by rank value for each group. Unlike row_number which assigns unique + * sequential numbers, rank assigns the same number to rows with equal sort key values (peers). + */ +public class GroupedTopNRankBuilder implements GroupedTopNBuilder { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRankBuilder.class); + + private final List sourceTypes; + private final boolean produceRanking; + private final int[] groupByChannels; + private final GroupByHash groupByHash; + private final TsBlockWithPositionComparator comparator; + private final RowReferenceTsBlockManager tsBlockManager = new RowReferenceTsBlockManager(); + private final GroupedTopNRankAccumulator groupedTopNRankAccumulator; + + private int effectiveGroupCount = 0; + + public GroupedTopNRankBuilder( + List sourceTypes, + TsBlockWithPositionComparator comparator, + TsBlockWithPositionEqualsAndHash equalsAndHash, + int topN, + boolean produceRanking, + int[] groupByChannels, + GroupByHash groupByHash) { + this.sourceTypes = sourceTypes; + this.produceRanking = produceRanking; + this.groupByChannels = groupByChannels; + this.groupByHash = groupByHash; + this.comparator = comparator; + + this.groupedTopNRankAccumulator = + new GroupedTopNRankAccumulator( + new RowIdComparisonHashStrategy() { + @Override + public int compare(long leftRowId, long rightRowId) { + TsBlock leftTsBlock = tsBlockManager.getTsBlock(leftRowId); + int leftPosition = tsBlockManager.getPosition(leftRowId); + TsBlock rightTsBlock = tsBlockManager.getTsBlock(rightRowId); + int rightPosition = tsBlockManager.getPosition(rightRowId); + return comparator.compareTo(leftTsBlock, leftPosition, rightTsBlock, rightPosition); + } + + @Override + public boolean equals(long leftRowId, long rightRowId) { + TsBlock leftTsBlock = tsBlockManager.getTsBlock(leftRowId); + int leftPosition = tsBlockManager.getPosition(leftRowId); + TsBlock rightTsBlock = tsBlockManager.getTsBlock(rightRowId); + int rightPosition = tsBlockManager.getPosition(rightRowId); + return equalsAndHash.equals(leftTsBlock, leftPosition, rightTsBlock, rightPosition); + } + + @Override + public long hashCode(long rowId) { + TsBlock tsBlock = tsBlockManager.getTsBlock(rowId); + int position = tsBlockManager.getPosition(rowId); + return equalsAndHash.hashCode(tsBlock, position); + } + }, + topN, + tsBlockManager::dereference); + } + + @Override + public void addTsBlock(TsBlock tsBlock) { + int[] groupIds; + if (groupByChannels.length == 0) { + groupIds = new int[tsBlock.getPositionCount()]; + if (tsBlock.getPositionCount() > 0) { + effectiveGroupCount = 1; + } + } else { + groupIds = groupByHash.getGroupIds(tsBlock.getColumns(groupByChannels)); + effectiveGroupCount = groupByHash.getGroupCount(); + } + + processTsBlock(tsBlock, effectiveGroupCount, groupIds); + } + + @Override + public Iterator getResult() { + return new ResultIterator(); + } + + @Override + public long getEstimatedSizeInBytes() { + return INSTANCE_SIZE + + groupByHash.getEstimatedSize() + + tsBlockManager.sizeOf() + + groupedTopNRankAccumulator.sizeOf(); + } + + private void processTsBlock(TsBlock newTsBlock, int groupCount, int[] groupIds) { + int firstPositionToAdd = + groupedTopNRankAccumulator.findFirstPositionToAdd( + newTsBlock, groupCount, groupIds, comparator, tsBlockManager); + if (firstPositionToAdd < 0) { + return; + } + + try (RowReferenceTsBlockManager.LoadCursor loadCursor = + tsBlockManager.add(newTsBlock, firstPositionToAdd)) { + for (int position = firstPositionToAdd; + position < newTsBlock.getPositionCount(); + position++) { + int groupId = groupIds[position]; + loadCursor.advance(); + groupedTopNRankAccumulator.add(groupId, loadCursor); + } + } + + tsBlockManager.compactIfNeeded(); + } + + private class ResultIterator extends AbstractIterator { + private final TsBlockBuilder tsBlockBuilder; + private final int groupIdCount = effectiveGroupCount; + private int currentGroupId = -1; + private final LongBigArray rowIdOutput = new LongBigArray(); + private final LongBigArray rankingOutput = new LongBigArray(); + private long currentGroupSize; + private int currentIndexInGroup; + + ResultIterator() { + ImmutableList.Builder sourceTypesBuilders = + ImmutableList.builder().addAll(sourceTypes); + if (produceRanking) { + sourceTypesBuilders.add(TSDataType.INT64); + } + tsBlockBuilder = new TsBlockBuilder(sourceTypesBuilders.build()); + } + + @Override + protected TsBlock computeNext() { + tsBlockBuilder.reset(); + while (!tsBlockBuilder.isFull()) { + while (currentIndexInGroup >= currentGroupSize) { + if (currentGroupId + 1 >= groupIdCount) { + if (tsBlockBuilder.isEmpty()) { + return endOfData(); + } + return tsBlockBuilder.build( + new RunLengthEncodedColumn( + TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + } + currentGroupId++; + currentGroupSize = + produceRanking + ? groupedTopNRankAccumulator.drainTo(currentGroupId, rowIdOutput, rankingOutput) + : groupedTopNRankAccumulator.drainTo(currentGroupId, rowIdOutput); + currentIndexInGroup = 0; + } + + long rowId = rowIdOutput.get(currentIndexInGroup); + TsBlock tsBlock = tsBlockManager.getTsBlock(rowId); + int position = tsBlockManager.getPosition(rowId); + for (int i = 0; i < sourceTypes.size(); i++) { + ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(i); + Column column = tsBlock.getColumn(i); + builder.write(column, position); + } + if (produceRanking) { + ColumnBuilder builder = tsBlockBuilder.getColumnBuilder(sourceTypes.size()); + builder.writeLong(rankingOutput.get(currentIndexInGroup)); + } + tsBlockBuilder.declarePosition(); + currentIndexInGroup++; + + tsBlockManager.dereference(rowId); + } + + if (tsBlockBuilder.isEmpty()) { + return endOfData(); + } + return tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java new file mode 100644 index 0000000000000..8e5fd2a79426b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java @@ -0,0 +1,8 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +public interface RowIdComparisonHashStrategy extends RowIdComparisonStrategy, RowIdHashStrategy { + @Override + default boolean equals(long leftRowId, long rightRowId) { + return compare(leftRowId, rightRowId) == 0; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java new file mode 100644 index 0000000000000..5b9aa6c3b4c75 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/SimpleTsBlockWithPositionEqualsAndHash.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; + +import java.util.ArrayList; +import java.util.List; + +/** + * Computes equality and hash based on specified column channels of a TsBlock. Used for peer group + * detection in RANK window functions. + */ +public class SimpleTsBlockWithPositionEqualsAndHash implements TsBlockWithPositionEqualsAndHash { + private final List channels; + private final List types; + + public SimpleTsBlockWithPositionEqualsAndHash(List allTypes, List channels) { + this.channels = channels; + this.types = new ArrayList<>(channels.size()); + for (int channel : channels) { + types.add(allTypes.get(channel)); + } + } + + @Override + public boolean equals(TsBlock left, int leftPosition, TsBlock right, int rightPosition) { + for (int i = 0; i < channels.size(); i++) { + int channel = channels.get(i); + Column leftColumn = left.getColumn(channel); + Column rightColumn = right.getColumn(channel); + + boolean leftNull = leftColumn.isNull(leftPosition); + boolean rightNull = rightColumn.isNull(rightPosition); + if (leftNull != rightNull) { + return false; + } + if (leftNull) { + continue; + } + + if (!valueEquals(leftColumn, leftPosition, rightColumn, rightPosition, types.get(i))) { + return false; + } + } + return true; + } + + @Override + public long hashCode(TsBlock block, int position) { + long hash = 0; + for (int i = 0; i < channels.size(); i++) { + Column column = block.getColumn(channels.get(i)); + hash = hash * 31 + valueHash(column, position, types.get(i)); + } + return hash; + } + + private static boolean valueEquals( + Column left, int leftPos, Column right, int rightPos, TSDataType type) { + switch (type) { + case INT32: + case DATE: + return left.getInt(leftPos) == right.getInt(rightPos); + case INT64: + case TIMESTAMP: + return left.getLong(leftPos) == right.getLong(rightPos); + case FLOAT: + return Float.compare(left.getFloat(leftPos), right.getFloat(rightPos)) == 0; + case DOUBLE: + return Double.compare(left.getDouble(leftPos), right.getDouble(rightPos)) == 0; + case BOOLEAN: + return left.getBoolean(leftPos) == right.getBoolean(rightPos); + case TEXT: + case BLOB: + case STRING: + return left.getBinary(leftPos).equals(right.getBinary(rightPos)); + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + } + + private static long valueHash(Column column, int position, TSDataType type) { + if (column.isNull(position)) { + return 0; + } + switch (type) { + case INT32: + case DATE: + return column.getInt(position); + case INT64: + case TIMESTAMP: + long v = column.getLong(position); + return v ^ (v >>> 32); + case FLOAT: + return Float.floatToIntBits(column.getFloat(position)); + case DOUBLE: + long dv = Double.doubleToLongBits(column.getDouble(position)); + return dv ^ (dv >>> 32); + case BOOLEAN: + return column.getBoolean(position) ? 1231 : 1237; + case TEXT: + case BLOB: + case STRING: + return column.getBinary(position).hashCode(); + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java new file mode 100644 index 0000000000000..dfb2e4257460f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java @@ -0,0 +1,382 @@ +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +/** Optimized hash table for streaming Top N peer group lookup operations. */ +// Note: this code was forked from fastutil (http://fastutil.di.unimi.it/) +// Long2LongOpenCustomHashMap. +// Copyright (C) 2002-2019 Sebastiano Vigna +public class TopNPeerGroupLookup { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TopNPeerGroupLookup.class); + + /** The buffer containing key and value data. */ + private Buffer buffer; + + /** The mask for wrapping a position counter. */ + private long mask; + + /** The hash strategy. */ + private final RowIdHashStrategy strategy; + + /** The current allocated table size. */ + private long tableSize; + + /** Threshold after which we rehash. */ + private long maxFill; + + /** The acceptable load factor. */ + private final float fillFactor; + + /** Number of entries in the set. */ + private long entryCount; + + /** + * The value denoting unmapped group IDs. Since group IDs need to co-exist at all times with row + * IDs, we only need to use one of the two IDs to indicate that a slot is unused. Group IDs have + * been arbitrarily selected for that purpose. + */ + private final long unmappedGroupId; + + /** The default return value for {@code get()}, {@code put()} and {@code remove()}. */ + private final long defaultReturnValue; + + /** + * Standard hash table parameters are expected. {@code unmappedGroupId} specifies the internal + * marker value for unmapped group IDs. + */ + public TopNPeerGroupLookup( + long expected, + float fillFactor, + RowIdHashStrategy strategy, + long unmappedGroupId, + long defaultReturnValue) { + checkArgument(expected >= 0, "The expected number of elements must be nonnegative"); + checkArgument( + fillFactor > 0 && fillFactor <= 1, + "Load factor must be greater than 0 and smaller than or equal to 1"); + this.fillFactor = fillFactor; + this.strategy = requireNonNull(strategy, "strategy is null"); + this.unmappedGroupId = unmappedGroupId; + this.defaultReturnValue = defaultReturnValue; + + tableSize = bigArraySize(expected, fillFactor); + mask = tableSize - 1; + maxFill = maxFill(tableSize, fillFactor); + buffer = new Buffer(tableSize, unmappedGroupId); + } + + public TopNPeerGroupLookup( + long expected, RowIdHashStrategy strategy, long unmappedGroupId, long defaultReturnValue) { + this(expected, 0.75f, strategy, unmappedGroupId, defaultReturnValue); + } + + /** Returns the size of this hash map in bytes. */ + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf(); + } + + public long size() { + return entryCount; + } + + public boolean isEmpty() { + return entryCount == 0; + } + + public long get(long groupId, long rowId) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowId); + long index = hash & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return buffer.getValue(index); + } + // There's always an unused entry. + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return buffer.getValue(index); + } + } + } + + public long get(long groupId, RowReference rowReference) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowReference); + long index = hash & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowReference, index)) { + return buffer.getValue(index); + } + // There's always an unused entry. + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowReference, index)) { + return buffer.getValue(index); + } + } + } + + public long put(long groupId, long rowId, long value) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowId); + + long index = find(groupId, rowId, hash); + if (index < 0) { + insert(twosComplement(index), groupId, rowId, hash, value); + return defaultReturnValue; + } + long oldValue = buffer.getValue(index); + buffer.setValue(index, value); + return oldValue; + } + + private long hash(long groupId, long rowId) { + return mix(groupId * 31 + strategy.hashCode(rowId)); + } + + private long hash(long groupId, RowReference rowReference) { + return mix(groupId * 31 + rowReference.hash(strategy)); + } + + private boolean equals(long groupId, long rowId, long index) { + return groupId == buffer.getGroupId(index) && strategy.equals(rowId, buffer.getRowId(index)); + } + + private boolean equals(long groupId, RowReference rowReference, long index) { + return groupId == buffer.getGroupId(index) + && rowReference.equals(strategy, buffer.getRowId(index)); + } + + private void insert(long index, long groupId, long rowId, long precomputedHash, long value) { + buffer.set(index, groupId, rowId, precomputedHash, value); + entryCount++; + if (entryCount > maxFill) { + rehash(bigArraySize(entryCount + 1, fillFactor)); + } + } + + /** + * Locate the index for the specified {@code groupId} and {@code rowId} key pair. If the index is + * unpopulated, then return the index as the two's complement value (which will be negative). + */ + private long find(long groupId, long rowId, long precomputedHash) { + long index = precomputedHash & mask; + if (buffer.isEmptySlot(index)) { + return twosComplement(index); + } + if (precomputedHash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return index; + } + // There's always an unused entry. + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return twosComplement(index); + } + if (precomputedHash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return index; + } + } + } + + public long remove(long groupId, long rowId) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowId); + long index = hash & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return removeEntry(index); + } + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return removeEntry(index); + } + } + } + + private long removeEntry(long index) { + long oldValue = buffer.getValue(index); + entryCount--; + shiftKeys(index); + return oldValue; + } + + /** + * Shifts left entries with the specified hash code, starting at the specified index, and empties + * the resulting free entry. + * + * @param index a starting position. + */ + private void shiftKeys(long index) { + // Shift entries with the same hash. + while (true) { + long currentHash; + + long initialIndex = index; + index = ((index) + 1) & mask; + while (true) { + if (buffer.isEmptySlot(index)) { + buffer.clear(initialIndex); + return; + } + currentHash = buffer.getPrecomputedHash(index); + long slot = currentHash & mask; + // Yes, this is dense logic. See fastutil Long2LongOpenCustomHashMap#shiftKeys + // implementation. + if (initialIndex <= index + ? initialIndex >= slot || slot > index + : initialIndex >= slot && slot > index) { + break; + } + index = (index + 1) & mask; + } + buffer.set( + initialIndex, + buffer.getGroupId(index), + buffer.getRowId(index), + currentHash, + buffer.getValue(index)); + } + } + + private void rehash(long newTableSize) { + long newMask = newTableSize - 1; // Note that this is used by the hashing macro + Buffer newBuffer = new Buffer(newTableSize, unmappedGroupId); + long index = tableSize; + for (long i = entryCount; i > 0; i--) { + index--; + while (buffer.isEmptySlot(index)) { + index--; + } + long hash = buffer.getPrecomputedHash(index); + long newIndex = hash & newMask; + if (!newBuffer.isEmptySlot(newIndex)) { + newIndex = (newIndex + 1) & newMask; + while (!newBuffer.isEmptySlot(newIndex)) { + newIndex = (newIndex + 1) & newMask; + } + } + newBuffer.set( + newIndex, buffer.getGroupId(index), buffer.getRowId(index), hash, buffer.getValue(index)); + } + tableSize = newTableSize; + mask = newMask; + maxFill = maxFill(tableSize, fillFactor); + buffer = newBuffer; + } + + private static long twosComplement(long value) { + return -(value + 1); + } + + private static class Buffer { + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(Buffer.class); + + private static final int POSITIONS_PER_ENTRY = 4; + private static final int ROW_ID_OFFSET = 1; + private static final int PRECOMPUTED_HASH_OFFSET = 2; + private static final int VALUE_OFFSET = 3; + + /* + * Memory layout: + * [LONG] groupId1, [LONG] rowId1, [LONG] precomputedHash1, [LONG] value1 + * [LONG] groupId2, [LONG] rowId2, [LONG] precomputedHash2, [LONG] value2 + * ... + */ + private final LongBigArray buffer; + private final long unmappedGroupId; + + public Buffer(long positions, long unmappedGroupId) { + buffer = new LongBigArray(unmappedGroupId); + buffer.ensureCapacity(positions * POSITIONS_PER_ENTRY); + this.unmappedGroupId = unmappedGroupId; + } + + public void set(long index, long groupId, long rowId, long precomputedHash, long value) { + buffer.set(index * POSITIONS_PER_ENTRY, groupId); + buffer.set(index * POSITIONS_PER_ENTRY + ROW_ID_OFFSET, rowId); + buffer.set(index * POSITIONS_PER_ENTRY + PRECOMPUTED_HASH_OFFSET, precomputedHash); + buffer.set(index * POSITIONS_PER_ENTRY + VALUE_OFFSET, value); + } + + public void clear(long index) { + // Since all fields of an index are set/unset together as a unit, we only need to choose one + // field to serve + // as a marker for empty slots. Group IDs have been arbitrarily selected for that purpose. + buffer.set(index * POSITIONS_PER_ENTRY, unmappedGroupId); + } + + public boolean isEmptySlot(long index) { + return getGroupId(index) == unmappedGroupId; + } + + public long getGroupId(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY); + } + + public long getRowId(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + ROW_ID_OFFSET); + } + + public long getPrecomputedHash(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + PRECOMPUTED_HASH_OFFSET); + } + + public long getValue(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + VALUE_OFFSET); + } + + public void setValue(long index, long value) { + buffer.set(index * POSITIONS_PER_ENTRY + VALUE_OFFSET, value); + } + + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf(); + } + } + + public static long maxFill(long n, float f) { + return Math.min((long) Math.ceil((double) ((float) n * f)), n - 1L); + } + + public static long nextPowerOfTwo(long x) { + return 1L << 64 - Long.numberOfLeadingZeros(x - 1L); + } + + public static long bigArraySize(long expected, float f) { + return nextPowerOfTwo((long) Math.ceil((double) ((float) expected / f))); + } + + public static long mix(long x) { + long h = x * -7046029254386353131L; + h ^= h >>> 32; + return h ^ h >>> 16; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java new file mode 100644 index 0000000000000..1c76a90233ad2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TsBlockWithPositionEqualsAndHash.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public interface TsBlockWithPositionEqualsAndHash { + boolean equals(TsBlock left, int leftPosition, TsBlock right, int rightPosition); + + long hashCode(TsBlock block, int position); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java index b94c546ac5497..c703fb423b9b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java @@ -21,11 +21,14 @@ import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNBuilder; +import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRankBuilder; import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRowNumberBuilder; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionComparator; +import org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionEqualsAndHash; import org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionComparator; +import org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionEqualsAndHash; import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; @@ -150,16 +153,21 @@ private Supplier getGroupedTopNBuilderSupplier() { groupByHashSupplier.get()); } - // if (rankingType == TopKRankingNode.RankingType.RANK) { - // Comparator comparator = new SimpleTsBlockWithPositionComparator( - // sourceTypes, sortChannels, ascendingOrders); - // return () -> new GroupedTopNRankBuilder( - // sourceTypes, - // comparator, - // maxRankingPerPartition, - // generateRanking, - // groupByHashSupplier.get()); - // } + if (rankingType == TopKRankingNode.RankingType.RANK) { + TsBlockWithPositionComparator comparator = + new SimpleTsBlockWithPositionComparator(inputTypes, sortChannels, sortOrders); + TsBlockWithPositionEqualsAndHash equalsAndHash = + new SimpleTsBlockWithPositionEqualsAndHash(inputTypes, sortChannels); + return () -> + new GroupedTopNRankBuilder( + inputTypes, + comparator, + equalsAndHash, + maxRowCountPerPartition, + !partial, + partitionChannels.stream().mapToInt(Integer::intValue).toArray(), + groupByHashSupplier.get()); + } if (rankingType == TopKRankingNode.RankingType.DENSE_RANK) { throw new UnsupportedOperationException("DENSE_RANK not yet implemented"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java index bb276f07150b9..ce37a5a9ed10e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java @@ -27,8 +27,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; @@ -139,6 +141,26 @@ public PlanNode visitPatternRecognition(PatternRecognitionNode node, Context con context.setCannotEliminateSort(true); return newNode; } + + @Override + public PlanNode visitTopKRanking(TopKRankingNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + context.setCannotEliminateSort(true); + return newNode; + } + + @Override + public PlanNode visitRowNumber(RowNumberNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + context.setCannotEliminateSort(true); + return newNode; + } } private static class Context { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java index 7eb6dfb81c97c..15da1d3908955 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java @@ -33,8 +33,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowNumberNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode; import java.util.Map; @@ -163,6 +165,18 @@ public PlanNode visitAggregationTableScan(AggregationTableScanNode node, Context return visitTableScan(node, context); } + @Override + public PlanNode visitTopKRanking(TopKRankingNode node, Context context) { + context.setCanTransform(false); + return visitPlan(node, context); + } + + @Override + public PlanNode visitRowNumber(RowNumberNode node, Context context) { + context.setCanTransform(false); + return visitPlan(node, context); + } + @Override public PlanNode visitUnion(UnionNode node, Context context) { context.setCanTransform(false); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java new file mode 100644 index 0000000000000..1d50b42673dc7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java @@ -0,0 +1,696 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.TreeLinearFillOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TopKRankingOperatorTest { + private static final ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool( + 1, "topKRankingOperator-test-instance-notification"); + + @Test + public void testTopKWithPartition() { + // Input: 4 rows for d1, 3 rows for d2 + // Sort by value (column 2) ascending, top 2 per partition + long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 4, 6, 2, 1}}; + + // Expected: top 2 per partition sorted by value ASC + // d1: value=1(rn=1), value=3(rn=2) + // d2: value=1(rn=1), value=2(rn=2) + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {1, 1}, new int[] {2, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + expectedByDevice, + 4); + } + + @Test + public void testTopKWithPartitionDescending() { + long[][] timeArray = {{1, 2, 3, 4, 5, 6}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 6, 2, 4}}; + + // top 2 per partition sorted by value DESC + // d1: value=5(rn=1), value=3(rn=2) + // d2: value=6(rn=1), value=4(rn=2) + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {5, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {6, 1}, new int[] {4, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.DESC_NULLS_LAST), + 2, + false, + expectedByDevice, + 4); + } + + @Test + public void testTopKWithoutPartition() { + // No partition: all rows in one group + long[][] timeArray = {{1, 2, 3, 4, 5}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 4, 2}}; + + // top 3 globally sorted by value ASC: value=1(rn=1), value=2(rn=2), value=3(rn=3) + int[][] expectedValueAndRn = {{1, 1}, {2, 2}, {3, 3}}; + + verifyTopKResultsGlobal( + timeArray, + deviceArray, + valueArray, + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 3, + false, + expectedValueAndRn, + 3); + } + + @Test + public void testTopKWithMultipleTsBlocks() { + long[][] timeArray = {{1, 2, 3}, {4, 5}, {6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1"}, {"d2", "d2"}, {"d2", "d2"}}; + int[][] valueArray = {{5, 3, 1}, {6, 2}, {4, 1}}; + + // top 2 per partition sorted by value ASC + // d1: value=1(rn=1), value=3(rn=2) + // d2: value=1(rn=1), value=2(rn=2) + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {1, 1}, new int[] {2, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + expectedByDevice, + 4); + } + + @Test + public void testTopKWithTopOne() { + long[][] timeArray = {{1, 2, 3, 4}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 6, 2}}; + + // top 1 per partition sorted by value ASC + // d1: value=3(rn=1) + // d2: value=2(rn=1) + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Collections.singletonList(new int[] {3, 1})); + expectedByDevice.put("d2", Collections.singletonList(new int[] {2, 1})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 1, + false, + expectedByDevice, + 2); + } + + // ==================== RANK Tests ==================== + + @Test + public void testRankWithPartitionAndTies() { + // d1: values [5, 3, 3, 1], d2: values [6, 2, 2] + // topN=2 ASC → d1 keeps rank≤2: 1(r=1),3(r=2),3(r=2); d2 keeps rank≤2: 2(r=1),2(r=1) + long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 3, 1, 6, 2, 2}}; + + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 1})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + TopKRankingNode.RankingType.RANK, + expectedByDevice, + 5); + } + + @Test + public void testRankWithPartitionDescendingAndTies() { + // d1: values [5, 3, 3, 1] DESC → 5(r=1),3(r=2),3(r=2),1(r=4) → keep rank≤2 + // d2: values [6, 2, 4] DESC → 6(r=1),4(r=2),2(r=3) → keep rank≤2 + long[][] timeArray = {{1, 2, 3, 4, 5, 6}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 3, 1, 6, 4}}; + + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {5, 1}, new int[] {3, 2}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {6, 1}, new int[] {4, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.DESC_NULLS_LAST), + 2, + false, + TopKRankingNode.RankingType.RANK, + expectedByDevice, + 5); + } + + @Test + public void testRankWithoutPartitionAndTies() { + // Global: values [5, 3, 1, 3, 2] ASC → 1(r=1),2(r=2),3(r=3),3(r=3),5(r=5) → keep rank≤3 + long[][] timeArray = {{1, 2, 3, 4, 5}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 3, 2}}; + + int[][] expectedValueAndRank = {{1, 1}, {2, 2}, {3, 3}, {3, 3}}; + + verifyTopKResultsGlobal( + timeArray, + deviceArray, + valueArray, + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 3, + false, + TopKRankingNode.RankingType.RANK, + expectedValueAndRank, + 4); + } + + @Test + public void testRankWithMultipleTsBlocksAndTies() { + // Same data as testRankWithPartitionAndTies, split across blocks + long[][] timeArray = {{1, 2, 3}, {4, 5}, {6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1"}, {"d1", "d2"}, {"d2", "d2"}}; + int[][] valueArray = {{5, 3, 3}, {1, 6}, {2, 2}}; + + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 1})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + TopKRankingNode.RankingType.RANK, + expectedByDevice, + 5); + } + + @Test + public void testRankTopOneWithTies() { + // d1: values [5, 3], d2: values [2, 2] + // topN=1 ASC → d1: 3(r=1); d2: 2(r=1),2(r=1) (ties at rank 1 are all kept) + long[][] timeArray = {{1, 2, 3, 4}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 2, 2}}; + + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Collections.singletonList(new int[] {3, 1})); + expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 1})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 1, + false, + TopKRankingNode.RankingType.RANK, + expectedByDevice, + 3); + } + + @Test + public void testRankNoTiesBehavesLikeRowNumber() { + // When no ties, rank should produce the same results as row_number + long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 4, 6, 2, 7}}; + + Map> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {6, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + TopKRankingNode.RankingType.RANK, + expectedByDevice, + 4); + } + + /** + * Verifies top-K results grouped by partition (device). The output order between partitions is + * not guaranteed, so we group results by device and verify each partition independently. + */ + private void verifyTopKResultsByPartition( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortOrders, + int maxRowCountPerPartition, + boolean partial, + Map> expectedByDevice, + int expectedTotalCount) { + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + partitionChannels, + partitionTypes, + sortChannels, + sortOrders, + maxRowCountPerPartition, + partial, + TopKRankingNode.RankingType.ROW_NUMBER, + expectedByDevice, + expectedTotalCount); + } + + private void verifyTopKResultsByPartition( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortOrders, + int maxRowCountPerPartition, + boolean partial, + TopKRankingNode.RankingType rankingType, + Map> expectedByDevice, + int expectedTotalCount) { + + Map> actualByDevice = new HashMap<>(); + int count = 0; + + try (TopKRankingOperator operator = + genTopKRankingOperator( + timeArray, + deviceArray, + valueArray, + partitionChannels, + partitionTypes, + sortChannels, + sortOrders, + maxRowCountPerPartition, + partial, + rankingType)) { + while (!operator.isFinished()) { + if (operator.hasNext()) { + TsBlock tsBlock = operator.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + int numColumns = tsBlock.getValueColumnCount(); + for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { + String device = + tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET); + int value = tsBlock.getColumn(2).getInt(i); + long rowNumber = tsBlock.getColumn(numColumns - 1).getLong(i); + actualByDevice + .computeIfAbsent(device, k -> new ArrayList<>()) + .add(new int[] {value, (int) rowNumber}); + } + } + } + } + assertEquals(expectedTotalCount, count); + + for (Map.Entry> entry : expectedByDevice.entrySet()) { + String device = entry.getKey(); + List expectedRows = entry.getValue(); + List actualRows = actualByDevice.get(device); + + assertTrue("Missing partition for device: " + device, actualRows != null); + assertEquals( + "Row count mismatch for device " + device, expectedRows.size(), actualRows.size()); + for (int i = 0; i < expectedRows.size(); i++) { + assertEquals( + "Value mismatch at row " + i + " for device " + device, + expectedRows.get(i)[0], + actualRows.get(i)[0]); + assertEquals( + "Row number mismatch at row " + i + " for device " + device, + expectedRows.get(i)[1], + actualRows.get(i)[1]); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void verifyTopKResultsGlobal( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortOrders, + int maxRowCountPerPartition, + boolean partial, + int[][] expectedValueAndRn, + int expectedTotalCount) { + verifyTopKResultsGlobal( + timeArray, + deviceArray, + valueArray, + partitionChannels, + partitionTypes, + sortChannels, + sortOrders, + maxRowCountPerPartition, + partial, + TopKRankingNode.RankingType.ROW_NUMBER, + expectedValueAndRn, + expectedTotalCount); + } + + private void verifyTopKResultsGlobal( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortOrders, + int maxRowCountPerPartition, + boolean partial, + TopKRankingNode.RankingType rankingType, + int[][] expectedValueAndRn, + int expectedTotalCount) { + + List results = new ArrayList<>(); + int count = 0; + + try (TopKRankingOperator operator = + genTopKRankingOperator( + timeArray, + deviceArray, + valueArray, + partitionChannels, + partitionTypes, + sortChannels, + sortOrders, + maxRowCountPerPartition, + partial, + rankingType)) { + while (!operator.isFinished()) { + if (operator.hasNext()) { + TsBlock tsBlock = operator.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + int numColumns = tsBlock.getValueColumnCount(); + for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) { + int value = tsBlock.getColumn(2).getInt(i); + long rowNumber = tsBlock.getColumn(numColumns - 1).getLong(i); + results.add(new int[] {value, (int) rowNumber}); + } + } + } + } + assertEquals(expectedTotalCount, count); + for (int i = 0; i < expectedValueAndRn.length; i++) { + assertEquals("Value mismatch at row " + i, expectedValueAndRn[i][0], results.get(i)[0]); + assertEquals( + "Row number mismatch at row " + i, expectedValueAndRn[i][1], results.get(i)[1]); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private DriverContext createDriverContext() { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNode = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNode, TreeLinearFillOperator.class.getSimpleName()); + return driverContext; + } + + private TopKRankingOperator genTopKRankingOperator( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortOrders, + int maxRowCountPerPartition, + boolean partial) { + return genTopKRankingOperator( + timeArray, + deviceArray, + valueArray, + partitionChannels, + partitionTypes, + sortChannels, + sortOrders, + maxRowCountPerPartition, + partial, + TopKRankingNode.RankingType.ROW_NUMBER); + } + + private TopKRankingOperator genTopKRankingOperator( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + List partitionChannels, + List partitionTypes, + List sortChannels, + List sortOrders, + int maxRowCountPerPartition, + boolean partial, + TopKRankingNode.RankingType rankingType) { + DriverContext driverContext = createDriverContext(); + + List inputDataTypes = + Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32); + List outputChannels = new ArrayList<>(); + for (int i = 0; i < inputDataTypes.size(); i++) { + outputChannels.add(i); + } + + Operator childOperator = new ChildOperator(timeArray, deviceArray, valueArray, driverContext); + return new TopKRankingOperator( + driverContext.getOperatorContexts().get(0), + childOperator, + rankingType, + inputDataTypes, + outputChannels, + partitionChannels, + partitionTypes, + sortChannels, + sortOrders, + maxRowCountPerPartition, + partial, + Optional.empty(), + 10, + Optional.empty()); + } + + static class ChildOperator implements Operator { + private int index; + private final long[][] timeArray; + private final String[][] deviceArray; + private final int[][] valueArray; + private final DriverContext driverContext; + + ChildOperator( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + DriverContext driverContext) { + this.timeArray = timeArray; + this.deviceArray = deviceArray; + this.valueArray = valueArray; + this.driverContext = driverContext; + this.index = 0; + } + + @Override + public OperatorContext getOperatorContext() { + return driverContext.getOperatorContexts().get(0); + } + + @Override + public TsBlock next() { + if (index >= timeArray.length) { + return null; + } + TsBlockBuilder builder = + new TsBlockBuilder( + timeArray[index].length, + Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32)); + for (int i = 0; i < timeArray[index].length; i++) { + builder.getColumnBuilder(0).writeLong(timeArray[index][i]); + builder + .getColumnBuilder(1) + .writeBinary(new Binary(deviceArray[index][i], TSFileConfig.STRING_CHARSET)); + builder.getColumnBuilder(2).writeInt(valueArray[index][i]); + } + builder.declarePositions(timeArray[index].length); + index++; + return builder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, builder.getPositionCount())); + } + + @Override + public boolean hasNext() { + return index < timeArray.length; + } + + @Override + public boolean isFinished() { + return index >= timeArray.length; + } + + @Override + public void close() {} + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java index e31f2f7e58065..b80fda29e108c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java @@ -122,7 +122,7 @@ public void testTopKRankingPushDown() { PlanTester planTester = new PlanTester(); String sql = - "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY s1) as rn FROM table1) WHERE rn <= 2"; + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY s1) as rn FROM table1) WHERE rn <= 2"; LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); PlanMatchPattern tableScan = tableScan("testdb.table1"); @@ -161,7 +161,7 @@ public void testPushDownFilterIntoWindow() { PlanTester planTester = new PlanTester(); String sql = - "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY s1) as rn FROM table1) WHERE rn <= 2"; + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY tag1 ORDER BY s1) as rn FROM table1) WHERE rn <= 2"; LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); PlanMatchPattern tableScan = tableScan("testdb.table1"); From ed79765e78bb8e2a6d6965278f883b51f6bbb746 Mon Sep 17 00:00:00 2001 From: Sh-Zh-7 Date: Sat, 21 Feb 2026 17:23:51 +0800 Subject: [PATCH 2/2] Add license header to all newly-added files. --- .../operator/GroupedTopNRankAccumulator.java | 19 +++++++++++++++++++ .../operator/RowIdComparisonHashStrategy.java | 19 +++++++++++++++++++ .../operator/TopNPeerGroupLookup.java | 19 +++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java index a0830657eff7b..3154b7a4bae1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.db.queryengine.execution.operator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java index 8e5fd2a79426b..acb63ce57f87d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/RowIdComparisonHashStrategy.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.db.queryengine.execution.operator; public interface RowIdComparisonHashStrategy extends RowIdComparisonStrategy, RowIdHashStrategy { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java index dfb2e4257460f..ea9ff73d87d3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.db.queryengine.execution.operator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;