From a16078c514780d46a882cb0bc5a9ce64ec3fd2e6 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 24 Mar 2026 12:11:32 -0700 Subject: [PATCH 01/11] [Mustang] Add query routing and execution handoff for Parquet-backed indices (#5247) Implements the query routing and AnalyticsExecutionEngine for Project Mustang's unified query pipeline. PPL queries targeting parquet_ prefixed indices are routed through UnifiedQueryPlanner and executed via a stub QueryPlanExecutor, with results formatted through the existing JDBC response pipeline. New files: - QueryPlanExecutor: @FunctionalInterface contract for analytics engine - AnalyticsExecutionEngine: converts Iterable to QueryResponse with type mapping and query size limit enforcement - RestUnifiedQueryAction: orchestrates schema building, planning, execution on sql-worker thread pool, with client/server error classification and metrics - StubQueryPlanExecutor: canned data for parquet_logs and parquet_metrics tables for development and testing Modified files: - RestPPLQueryAction: routing branch for parquet_ indices - SQLPlugin: passes ClusterService and NodeClient to RestPPLQueryAction - plugin/build.gradle: adds :api dependency for UnifiedQueryPlanner Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../analytics/AnalyticsExecutionEngine.java | 115 ++++++ .../executor/analytics/QueryPlanExecutor.java | 32 ++ .../AnalyticsExecutionEngineTest.java | 383 ++++++++++++++++++ .../opensearch/sql/ppl/AnalyticsPPLIT.java | 208 ++++++++++ plugin/build.gradle | 1 + .../org/opensearch/sql/plugin/SQLPlugin.java | 2 +- .../sql/plugin/rest/RestPPLQueryAction.java | 14 +- .../plugin/rest/RestUnifiedQueryAction.java | 263 ++++++++++++ .../plugin/rest/StubQueryPlanExecutor.java | 60 +++ .../rest/RestUnifiedQueryActionTest.java | 32 ++ 10 files changed, 1108 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java create mode 100644 core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java create mode 100644 core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java create mode 100644 integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java new file mode 100644 index 00000000000..4159fa4845d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.ExecutionContext; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.pagination.Cursor; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * Execution engine adapter for the analytics engine (Project Mustang). + * + *

Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link + * ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the + * analytics engine, and converts the raw results into {@link QueryResponse}. + */ +public class AnalyticsExecutionEngine implements ExecutionEngine { + + private final QueryPlanExecutor planExecutor; + + public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) { + this.planExecutor = planExecutor; + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void execute( + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + try { + Integer querySizeLimit = context.sysLimit.querySizeLimit(); + Iterable rows = planExecutor.execute(plan, null); + + List fields = plan.getRowType().getFieldList(); + List results = convertRows(rows, fields, querySizeLimit); + Schema schema = buildSchema(fields); + + listener.onResponse(new QueryResponse(schema, results, Cursor.None)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private List convertRows( + Iterable rows, List fields, Integer querySizeLimit) { + List results = new ArrayList<>(); + for (Object[] row : rows) { + if (querySizeLimit != null && results.size() >= querySizeLimit) { + break; + } + Map valueMap = new LinkedHashMap<>(); + for (int i = 0; i < fields.size(); i++) { + String columnName = fields.get(i).getName(); + Object value = (i < row.length) ? row[i] : null; + valueMap.put(columnName, ExprValueUtils.fromObjectValue(value)); + } + results.add(ExprTupleValue.fromExprValueMap(valueMap)); + } + return results; + } + + private Schema buildSchema(List fields) { + List columns = new ArrayList<>(); + for (RelDataTypeField field : fields) { + ExprType exprType = convertType(field.getType()); + columns.add(new Schema.Column(field.getName(), null, exprType)); + } + return new Schema(columns); + } + + private ExprType convertType(RelDataType type) { + try { + return OpenSearchTypeFactory.convertRelDataTypeToExprType(type); + } catch (IllegalArgumentException e) { + return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java new file mode 100644 index 00000000000..fd322ca432a --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import org.apache.calcite.rel.RelNode; + +/** + * Executes a Calcite {@link RelNode} logical plan against the analytics engine. + * + *

This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the + * analytics-framework library. It will be replaced by the upstream interface once the + * analytics-framework JAR is published. + * + * @see Upstream + * QueryPlanExecutor + */ +@FunctionalInterface +public interface QueryPlanExecutor { + + /** + * Executes the given logical plan and returns result rows. + * + * @param plan the Calcite RelNode subtree to execute + * @param context execution context (opaque to avoid server dependency) + * @return rows produced by the engine + */ + Iterable execute(RelNode plan, Object context); +} diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java new file mode 100644 index 00000000000..04e65b1d383 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -0,0 +1,383 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.SysLimit; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +class AnalyticsExecutionEngineTest { + + private AnalyticsExecutionEngine engine; + private QueryPlanExecutor mockExecutor; + private CalcitePlanContext mockContext; + + @BeforeEach + void setUp() throws Exception { + mockExecutor = mock(QueryPlanExecutor.class); + engine = new AnalyticsExecutionEngine(mockExecutor); + mockContext = mock(CalcitePlanContext.class); + setSysLimit(mockContext, SysLimit.DEFAULT); + } + + /** Sets the public final sysLimit field on a mocked CalcitePlanContext. */ + private static void setSysLimit(CalcitePlanContext context, SysLimit sysLimit) throws Exception { + Field field = CalcitePlanContext.class.getDeclaredField("sysLimit"); + field.setAccessible(true); + field.set(context, sysLimit); + } + + @Test + void executeRelNode_basicTypesAndRows() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Arrays.asList(new Object[] {"Alice", 30}, new Object[] {"Bob", 25}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + // Schema: 2 columns [name:STRING, age:INTEGER] + assertEquals(2, response.getSchema().getColumns().size(), "Column count. " + dump); + assertEquals("name", response.getSchema().getColumns().get(0).getName(), dump); + assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals("age", response.getSchema().getColumns().get(1).getName(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump); + + // Rows: [{name=Alice, age=30}, {name=Bob, age=25}] + assertEquals(2, response.getResults().size(), "Row count. " + dump); + assertEquals( + "Alice", response.getResults().get(0).tupleValue().get("name").value(), "Row 0. " + dump); + assertEquals( + 30, response.getResults().get(0).tupleValue().get("age").value(), "Row 0. " + dump); + assertEquals( + "Bob", response.getResults().get(1).tupleValue().get("name").value(), "Row 1. " + dump); + assertEquals( + 25, response.getResults().get(1).tupleValue().get("age").value(), "Row 1. " + dump); + + // Cursor: None + assertEquals(org.opensearch.sql.executor.pagination.Cursor.None, response.getCursor(), dump); + } + + @Test + void executeRelNode_numericTypes() { + RelNode relNode = + mockRelNode( + "b", SqlTypeName.TINYINT, + "s", SqlTypeName.SMALLINT, + "i", SqlTypeName.INTEGER, + "l", SqlTypeName.BIGINT, + "f", SqlTypeName.FLOAT, + "d", SqlTypeName.DOUBLE); + Iterable rows = + Collections.singletonList(new Object[] {(byte) 1, (short) 2, 3, 4L, 5.0f, 6.0}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.BYTE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.SHORT, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(ExprCoreType.LONG, response.getSchema().getColumns().get(3).getExprType(), dump); + assertEquals(ExprCoreType.FLOAT, response.getSchema().getColumns().get(4).getExprType(), dump); + assertEquals(ExprCoreType.DOUBLE, response.getSchema().getColumns().get(5).getExprType(), dump); + + // Verify actual values + assertEquals( + (byte) 1, + response.getResults().get(0).tupleValue().get("b").value(), + "byte value. " + dump); + assertEquals( + (short) 2, + response.getResults().get(0).tupleValue().get("s").value(), + "short value. " + dump); + assertEquals( + 3, response.getResults().get(0).tupleValue().get("i").value(), "int value. " + dump); + assertEquals( + 4L, response.getResults().get(0).tupleValue().get("l").value(), "long value. " + dump); + assertEquals( + 5.0f, response.getResults().get(0).tupleValue().get("f").value(), "float value. " + dump); + assertEquals( + 6.0, response.getResults().get(0).tupleValue().get("d").value(), "double value. " + dump); + } + + @Test + void executeRelNode_temporalTypes() { + RelNode relNode = + mockRelNode("dt", SqlTypeName.DATE, "tm", SqlTypeName.TIME, "ts", SqlTypeName.TIMESTAMP); + Iterable emptyRows = Collections.emptyList(); + when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump); + } + + @Test + void executeRelNode_querySizeLimit() throws Exception { + RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); + List manyRows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + manyRows.add(new Object[] {i}); + } + when(mockExecutor.execute(relNode, null)).thenReturn(manyRows); + setSysLimit(mockContext, new SysLimit(10, 10000, 50000)); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals( + 10, + response.getResults().size(), + "Should truncate to querySizeLimit=10, got " + response.getResults().size() + ". " + dump); + } + + @Test + void executeRelNode_emptyResults() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR); + Iterable emptyRows = Collections.emptyList(); + when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getSchema().getColumns().size(), "Schema column count. " + dump); + assertEquals(0, response.getResults().size(), "Row count should be 0. " + dump); + } + + @Test + void executeRelNode_nullValues() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Collections.singletonList(new Object[] {null, null}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getResults().size(), "Row count. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("name").isNull(), + "name should be null. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("age").isNull(), + "age should be null. " + dump); + } + + @Test + void executeRelNode_errorPropagation() { + RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); + when(mockExecutor.execute(relNode, null)).thenThrow(new RuntimeException("Engine failure")); + + Exception error = executeAndCaptureError(relNode); + System.out.println(dumpError("executeRelNode_errorPropagation", error)); + + assertEquals( + "Engine failure", + error.getMessage(), + "Exception type: " + error.getClass().getSimpleName() + ", message: " + error.getMessage()); + } + + @Test + void physicalPlanExecute_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute(physicalPlan, failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecute_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExecuteWithContext_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute( + physicalPlan, + org.opensearch.sql.executor.ExecutionContext.emptyExecutionContext(), + failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecuteWithContext_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExplain_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.explain(physicalPlan, explainFailureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExplain_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + // --- helpers --- + + private QueryResponse executeAndCapture(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute(relNode, mockContext, captureListener(ref)); + assertNotNull(ref.get(), "QueryResponse should not be null"); + // Always print the full response so test output shows exact results + System.out.println(dumpResponse(ref.get())); + return ref.get(); + } + + private Exception executeAndCaptureError(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute( + relNode, + mockContext, + new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }); + assertNotNull(ref.get(), "onFailure should have been called"); + return ref.get(); + } + + private ResponseListener failureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private ResponseListener explainFailureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(ExplainResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private String dumpError(String testName, Exception e) { + return "\n--- " + + testName + + " ---\n" + + "Exception: " + + e.getClass().getSimpleName() + + "\n" + + "Message: " + + e.getMessage() + + "\n--- End ---"; + } + + /** Dumps the full QueryResponse into a readable string for test output and assertion messages. */ + private String dumpResponse(QueryResponse response) { + StringBuilder sb = new StringBuilder(); + sb.append("\n--- QueryResponse ---\n"); + + sb.append("Schema: ["); + sb.append( + response.getSchema().getColumns().stream() + .map(c -> c.getName() + ":" + c.getExprType().typeName()) + .collect(Collectors.joining(", "))); + sb.append("]\n"); + + sb.append("Rows (").append(response.getResults().size()).append("):\n"); + for (int i = 0; i < response.getResults().size(); i++) { + sb.append(" [").append(i).append("] "); + sb.append(response.getResults().get(i).tupleValue()); + sb.append("\n"); + } + + sb.append("Cursor: ").append(response.getCursor()).append("\n"); + sb.append("--- End ---"); + return sb.toString(); + } + + private RelNode mockRelNode(Object... nameTypePairs) { + SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < nameTypePairs.length; i += 2) { + String name = (String) nameTypePairs[i]; + SqlTypeName typeName = (SqlTypeName) nameTypePairs[i + 1]; + builder.add(name, typeName); + } + RelDataType rowType = builder.build(); + + RelNode relNode = mock(RelNode.class); + when(relNode.getRowType()).thenReturn(rowType); + return relNode; + } + + private ResponseListener captureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + ref.set(response); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected failure", e); + } + }; + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java new file mode 100644 index 00000000000..e7788ae5c67 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.client.ResponseException; + +/** + * Integration tests for PPL queries routed through the analytics engine path (Project Mustang). + * Queries targeting "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses + * {@code AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}. + * + *

These tests validate the full pipeline: REST request -> routing -> planning via + * UnifiedQueryPlanner -> execution via AnalyticsExecutionEngine -> response formatting. + * + *

The stub executor always returns the full table rows regardless of the logical plan. After + * projection (| fields), the execution engine maps row values by position -- so projected columns + * get the values from the corresponding positions in the full row, not the actual projected column. + * This is expected behavior for a stub; the real analytics engine will evaluate the plan correctly. + */ +public class AnalyticsPPLIT extends PPLIntegTestCase { + + private static final Logger LOG = LogManager.getLogger(AnalyticsPPLIT.class); + + @Override + protected void init() throws Exception { + // No index loading needed -- stub schema and data are hardcoded + // in RestUnifiedQueryAction and StubQueryPlanExecutor + } + + // --- Full table scan tests with schema + data verification --- + + @Test + public void testBasicQuerySchemaAndData() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info("[testBasicQuerySchemaAndData] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema( + result, + schema("ts", "timestamp"), + schema("status", "integer"), + schema("message", "keyword"), + schema("ip_addr", "keyword")); + verifyNumOfRows(result, 3); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"), + rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"), + rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3")); + } + + @Test + public void testParquetMetricsSchemaAndData() throws IOException { + String query = "source = opensearch.parquet_metrics"; + JSONObject result = executeQuery(query); + LOG.info( + "[testParquetMetricsSchemaAndData] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema( + result, + schema("ts", "timestamp"), + schema("cpu", "double"), + schema("memory", "double"), + schema("host", "keyword")); + verifyNumOfRows(result, 2); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 75.5, 8192.5, "host-1"), + rows("2024-01-15 10:31:00", 82.3, 7680.5, "host-2")); + } + + // --- Response format validation --- + + @Test + public void testResponseFormatHasRequiredFields() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info( + "[testResponseFormatHasRequiredFields] query: {}\nresponse: {}", query, result.toString(2)); + + String msg = "Full response: " + result.toString(2); + assertTrue("Response missing 'schema'. " + msg, result.has("schema")); + assertTrue("Response missing 'datarows'. " + msg, result.has("datarows")); + assertTrue("Response missing 'total'. " + msg, result.has("total")); + assertTrue("Response missing 'size'. " + msg, result.has("size")); + assertTrue("Response missing 'status'. " + msg, result.has("status")); + assertEquals( + "Expected status 200 but got " + result.getInt("status") + ". " + msg, + 200, + result.getInt("status")); + } + + @Test + public void testTotalAndSizeMatchRowCount() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info("[testTotalAndSizeMatchRowCount] query: {}\nresponse: {}", query, result.toString(2)); + + int rowCount = result.getJSONArray("datarows").length(); + assertEquals( + String.format( + "total should match row count. rows=%d, total=%d, size=%d. Response: %s", + rowCount, result.getInt("total"), result.getInt("size"), result.toString(2)), + rowCount, + result.getInt("total")); + assertEquals( + String.format( + "size should match row count. rows=%d, size=%d. Response: %s", + rowCount, result.getInt("size"), result.toString(2)), + rowCount, + result.getInt("size")); + } + + // --- Projection tests (schema verification -- stub doesn't evaluate projections) --- + + @Test + public void testFieldsProjectionChangesSchema() throws IOException { + String query = "source = opensearch.parquet_logs | fields ts, message"; + JSONObject result = executeQuery(query); + LOG.info( + "[testFieldsProjectionChangesSchema] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema(result, schema("ts", "timestamp"), schema("message", "keyword")); + verifyNumOfRows(result, 3); + } + + @Test + public void testSingleFieldProjection() throws IOException { + String query = "source = opensearch.parquet_logs | fields status"; + JSONObject result = executeQuery(query); + LOG.info("[testSingleFieldProjection] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema(result, schema("status", "integer")); + verifyNumOfRows(result, 3); + } + + // --- Error handling tests --- + + @Test + public void testSyntaxErrorReturnsClientError() throws IOException { + String query = "source = opensearch.parquet_logs | invalid_command"; + ResponseException e = assertThrows(ResponseException.class, () -> executeQuery(query)); + int statusCode = e.getResponse().getStatusLine().getStatusCode(); + String responseBody = getResponseBody(e.getResponse(), true); + LOG.info( + "[testSyntaxErrorReturnsClientError] query: {}\nstatus: {}\nresponse: {}", + query, + statusCode, + responseBody); + + assertTrue( + String.format( + "Syntax error should return 4xx, got %d. Response: %s", statusCode, responseBody), + statusCode >= 400 && statusCode < 500); + } + + // --- Regression tests --- + + @Test + public void testNonParquetQueryStillWorks() throws IOException { + loadIndex(Index.ACCOUNT); + String query = String.format("source=%s | head 1 | fields firstname", TEST_INDEX_ACCOUNT); + JSONObject result = executeQuery(query); + LOG.info("[testNonParquetQueryStillWorks] query: {}\nresponse: {}", query, result.toString(2)); + + assertNotNull("Non-parquet query returned null. Query: " + query, result); + assertTrue( + "Non-parquet query missing 'datarows'. Response: " + result.toString(2), + result.has("datarows")); + int rowCount = result.getJSONArray("datarows").length(); + assertTrue( + String.format( + "Non-parquet query returned 0 rows. Expected > 0. Response: %s", result.toString(2)), + rowCount > 0); + } + + @Test + public void testNonParquetAggregationStillWorks() throws IOException { + loadIndex(Index.ACCOUNT); + String query = String.format("source=%s | stats count()", TEST_INDEX_ACCOUNT); + JSONObject result = executeQuery(query); + LOG.info( + "[testNonParquetAggregationStillWorks] query: {}\nresponse: {}", query, result.toString(2)); + + int total = result.getInt("total"); + assertTrue( + String.format( + "Non-parquet aggregation returned total=%d, expected > 0. Response: %s", + total, result.toString(2)), + total > 0); + } +} diff --git a/plugin/build.gradle b/plugin/build.gradle index 340787fa01f..7b757759e13 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -160,6 +160,7 @@ dependencies { api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}" api project(":ppl") + api project(':api') api project(':legacy') api project(':opensearch') api project(':prometheus') diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index edffd65f6bf..016d53dc8de 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -163,7 +163,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( - new RestPPLQueryAction(), + new RestPPLQueryAction(clusterService, this.client), new RestPPLGrammarAction(), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index ffdd90504f7..4470a165075 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -17,6 +17,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexNotFoundException; @@ -29,6 +30,7 @@ import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; @@ -44,9 +46,13 @@ public class RestPPLQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(); + /** Unified query handler for Parquet-backed indices (Analytics engine path). */ + private final RestUnifiedQueryAction unifiedQueryHandler; + /** Constructor of RestPPLQueryAction. */ - public RestPPLQueryAction() { + public RestPPLQueryAction(ClusterService clusterService, NodeClient client) { super(); + this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor()); } private static boolean isClientError(Exception e) { @@ -86,6 +92,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request)); + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices + String pplQuery = transportPPLQueryRequest.toPPLQueryRequest().getRequest(); + if (RestUnifiedQueryAction.isAnalyticsIndex(pplQuery)) { + return channel -> unifiedQueryHandler.execute(pplQuery, QueryType.PPL, channel); + } + return channel -> nodeClient.execute( PPLQueryAction.INSTANCE, diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java new file mode 100644 index 00000000000..113a9614975 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -0,0 +1,263 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.api.UnifiedQueryPlanner; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; +import org.opensearch.sql.executor.analytics.QueryPlanExecutor; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.protocol.response.QueryResult; +import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; +import org.opensearch.sql.protocol.response.format.ResponseFormatter; +import org.opensearch.transport.client.node.NodeClient; + +/** + * Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL queries + * using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates to {@link + * AnalyticsExecutionEngine} for execution. + */ +public class RestUnifiedQueryAction { + + private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class); + private static final String SCHEMA_NAME = "opensearch"; + + /** + * Pattern to extract index name from PPL source clause. Matches: source = index, source=index, + * source = `index`, source = catalog.index + */ + private static final Pattern SOURCE_PATTERN = + Pattern.compile( + "source\\s*=\\s*`?([a-zA-Z0-9_.*]+(?:\\.[a-zA-Z0-9_.*]+)*)`?", Pattern.CASE_INSENSITIVE); + + private final AnalyticsExecutionEngine analyticsEngine; + private final NodeClient client; + + public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) { + this.client = client; + this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor); + } + + /** + * Check if the query targets an analytics engine index (e.g., Parquet-backed). Currently uses a + * prefix convention ("parquet_"). In production, this will check index settings such as + * index.storage_type. + */ + public static boolean isAnalyticsIndex(String query) { + if (query == null) { + return false; + } + String indexName = extractIndexName(query); + if (indexName == null) { + return false; + } + // Handle qualified names like "catalog.parquet_logs" — check the last segment + int lastDot = indexName.lastIndexOf('.'); + String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + return tableName.startsWith("parquet_"); + } + + /** + * Extract the source index name from a PPL query string. + * + * @param query the PPL query string + * @return the index name, or null if not found + */ + static String extractIndexName(String query) { + Matcher matcher = SOURCE_PATTERN.matcher(query); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } + + /** + * Execute a query through the unified query pipeline on the sql-worker thread pool. + * + * @param query the PPL query string + * @param queryType SQL or PPL + * @param channel the REST channel for sending the response + */ + public void execute(String query, QueryType queryType, RestChannel channel) { + client + .threadPool() + .schedule( + () -> doExecute(query, queryType, channel), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + private void doExecute(String query, QueryType queryType, RestChannel channel) { + try { + long startTime = System.nanoTime(); + + // TODO: Replace stub schema with EngineContext.getSchema() when analytics engine is ready + AbstractSchema schema = buildStubSchema(); + + try (UnifiedQueryContext context = + UnifiedQueryContext.builder() + .language(queryType) + .catalog(SCHEMA_NAME, schema) + .defaultNamespace(SCHEMA_NAME) + .build()) { + + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + long planTime = System.nanoTime(); + LOG.info( + "[unified] Planning completed in {}ms for {} query", + (planTime - startTime) / 1_000_000, + queryType); + + CalcitePlanContext planContext = context.getPlanContext(); + analyticsEngine.execute(plan, planContext, createQueryListener(channel, planTime)); + } + } catch (Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + } + + private ResponseListener createQueryListener( + RestChannel channel, long planEndTime) { + ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + long execTime = System.nanoTime(); + LOG.info( + "[unified] Execution completed in {}ms, {} rows returned", + (execTime - planEndTime) / 1_000_000, + response.getResults().size()); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); + String result = + formatter.format( + new QueryResult( + response.getSchema(), response.getResults(), response.getCursor(), PPL_SPEC)); + channel.sendResponse(new BytesRestResponse(OK, formatter.contentType(), result)); + } + + @Override + public void onFailure(Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + }; + } + + /** + * Stub schema for development and testing. Returns a hardcoded table definition for any + * "parquet_*" table. Will be replaced by EngineContext.getSchema() when the analytics engine is + * ready. + */ + private static AbstractSchema buildStubSchema() { + return new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of( + "parquet_logs", buildStubTable(), + "parquet_metrics", buildStubMetricsTable()); + } + }; + } + + private static Table buildStubTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("status", SqlTypeName.INTEGER) + .add("message", SqlTypeName.VARCHAR) + .add("ip_addr", SqlTypeName.VARCHAR) + .build(); + } + }; + } + + private static Table buildStubMetricsTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("cpu", SqlTypeName.DOUBLE) + .add("memory", SqlTypeName.DOUBLE) + .add("host", SqlTypeName.VARCHAR) + .build(); + } + }; + } + + /** Classify whether the exception is a client error (bad query) or server error (engine bug). */ + private static boolean isClientError(Exception e) { + return e instanceof SyntaxCheckException + || e instanceof SemanticCheckException + || e instanceof IllegalArgumentException + || e instanceof NullPointerException; + } + + private static void recordFailureMetric(Exception e) { + if (isClientError(e)) { + LOG.warn("[unified] Client error in query execution", e); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); + } else { + LOG.error("[unified] Server error in query execution", e); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment(); + } + } + + private static void reportError(RestChannel channel, Exception e) { + RestStatus status = + isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR; + String reason = e.getMessage() != null ? e.getMessage() : "Unknown error"; + // Escape characters that would break JSON + reason = + reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", ""); + channel.sendResponse( + new BytesRestResponse( + status, + "application/json; charset=UTF-8", + "{\"error\":{\"type\":\"" + + e.getClass().getSimpleName() + + "\",\"reason\":\"" + + reason + + "\"},\"status\":" + + status.getStatus() + + "}")); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java new file mode 100644 index 00000000000..fdd15eb7c17 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import java.time.Instant; +import java.util.List; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.opensearch.sql.executor.analytics.QueryPlanExecutor; + +/** + * Stub implementation of {@link QueryPlanExecutor} for development and testing. Returns canned data + * so the full pipeline (routing → planning → execution → response formatting) can be validated + * without the analytics engine. + * + *

Will be replaced by the real analytics engine implementation when available. + */ +public class StubQueryPlanExecutor implements QueryPlanExecutor { + + @Override + public Iterable execute(RelNode plan, Object context) { + // Return canned rows matching the stub schema defined in RestUnifiedQueryAction. + // The column order must match the schema: ts, status, message, ip_addr + // (for parquet_logs table). For other tables, return empty results. + String tableName = extractTableName(plan); + if (tableName != null && tableName.contains("parquet_logs")) { + return List.of( + new Object[] { + Instant.parse("2024-01-15T10:30:00Z"), 200, "Request completed", "192.168.1.1" + }, + new Object[] { + Instant.parse("2024-01-15T10:31:00Z"), 200, "Health check OK", "192.168.1.2" + }, + new Object[] { + Instant.parse("2024-01-15T10:32:00Z"), 500, "Internal server error", "192.168.1.3" + }); + } + if (tableName != null && tableName.contains("parquet_metrics")) { + return List.of( + new Object[] {Instant.parse("2024-01-15T10:30:00Z"), 75.5, 8192.5, "host-1"}, + new Object[] {Instant.parse("2024-01-15T10:31:00Z"), 82.3, 7680.5, "host-2"}); + } + return List.of(); + } + + private String extractTableName(RelNode plan) { + // Use RelOptUtil.toString to get the full plan tree including child nodes + String planStr = RelOptUtil.toString(plan); + if (planStr.contains("parquet_logs")) { + return "parquet_logs"; + } + if (planStr.contains("parquet_metrics")) { + return "parquet_metrics"; + } + return null; + } +} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java new file mode 100644 index 00000000000..31d719acc0d --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Tests for analytics index routing in RestUnifiedQueryAction. Index name extraction will be + * replaced by UnifiedQueryParser -- these tests focus on routing behavior only. + */ +public class RestUnifiedQueryActionTest { + + @Test + public void parquetIndexRoutesToAnalytics() { + assertTrue(RestUnifiedQueryAction.isAnalyticsIndex("source = parquet_logs | fields ts")); + assertTrue( + RestUnifiedQueryAction.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts")); + } + + @Test + public void nonParquetIndexRoutesToLucene() { + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("source = my_logs | fields ts")); + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex(null)); + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("")); + } +} From 6d0c88187efdc5814419c579a56bc279590919a7 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 14:14:41 -0700 Subject: [PATCH 02/11] Align isClientError with RestPPLQueryAction classification Add missing exception types to isClientError(): IndexNotFoundException, ExpressionEvaluationException, QueryEngineException, DataSourceClientException, IllegalAccessException. Matches the full list in RestPPLQueryAction.isClientError(). Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../plugin/rest/RestUnifiedQueryAction.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 113a9614975..f873179bbde 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.sql.api.UnifiedQueryContext; @@ -32,6 +33,9 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.datasources.exceptions.DataSourceClientException; +import org.opensearch.sql.exception.ExpressionEvaluationException; +import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; @@ -223,12 +227,20 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { }; } - /** Classify whether the exception is a client error (bad query) or server error (engine bug). */ + /** + * Classify whether the exception is a client error (bad query) or server error (engine bug). + * Matches the classification in {@link RestPPLQueryAction#isClientError}. + */ private static boolean isClientError(Exception e) { - return e instanceof SyntaxCheckException - || e instanceof SemanticCheckException + return e instanceof NullPointerException || e instanceof IllegalArgumentException - || e instanceof NullPointerException; + || e instanceof IndexNotFoundException + || e instanceof SemanticCheckException + || e instanceof ExpressionEvaluationException + || e instanceof QueryEngineException + || e instanceof SyntaxCheckException + || e instanceof DataSourceClientException + || e instanceof IllegalAccessException; } private static void recordFailureMetric(Exception e) { From 057fb169eed8e451acdc81d9e6f06c635343a2e7 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 14:20:19 -0700 Subject: [PATCH 03/11] Move stub code into analytics/stub sub-package Extract StubSchemaProvider, StubQueryPlanExecutor, and StubIndexDetector into plugin/.../rest/analytics/stub/ package to clearly separate temporary stub code from production code. RestUnifiedQueryAction now delegates to these stub classes. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../sql/plugin/rest/RestPPLQueryAction.java | 4 +- .../plugin/rest/RestUnifiedQueryAction.java | 100 ++---------------- .../analytics/stub/StubIndexDetector.java | 59 +++++++++++ .../stub}/StubQueryPlanExecutor.java | 8 +- .../analytics/stub/StubSchemaProvider.java | 64 +++++++++++ 5 files changed, 135 insertions(+), 100 deletions(-) create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java rename plugin/src/main/java/org/opensearch/sql/plugin/rest/{ => analytics/stub}/StubQueryPlanExecutor.java (79%) create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index 4470a165075..527eb245909 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -52,7 +52,9 @@ public class RestPPLQueryAction extends BaseRestHandler { /** Constructor of RestPPLQueryAction. */ public RestPPLQueryAction(ClusterService clusterService, NodeClient client) { super(); - this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor()); + this.unifiedQueryHandler = + new RestUnifiedQueryAction( + client, new org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor()); } private static boolean isClientError(Exception e) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index f873179bbde..a15be76891d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -11,16 +11,8 @@ import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; -import org.apache.calcite.schema.impl.AbstractTable; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.unit.TimeValue; @@ -42,6 +34,8 @@ import org.opensearch.sql.executor.analytics.QueryPlanExecutor; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.plugin.rest.analytics.stub.StubIndexDetector; +import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; import org.opensearch.sql.protocol.response.format.ResponseFormatter; @@ -57,14 +51,6 @@ public class RestUnifiedQueryAction { private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class); private static final String SCHEMA_NAME = "opensearch"; - /** - * Pattern to extract index name from PPL source clause. Matches: source = index, source=index, - * source = `index`, source = catalog.index - */ - private static final Pattern SOURCE_PATTERN = - Pattern.compile( - "source\\s*=\\s*`?([a-zA-Z0-9_.*]+(?:\\.[a-zA-Z0-9_.*]+)*)`?", Pattern.CASE_INSENSITIVE); - private final AnalyticsExecutionEngine analyticsEngine; private final NodeClient client; @@ -74,36 +60,11 @@ public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) } /** - * Check if the query targets an analytics engine index (e.g., Parquet-backed). Currently uses a - * prefix convention ("parquet_"). In production, this will check index settings such as - * index.storage_type. + * Check if the query targets an analytics engine index. Delegates to {@link StubIndexDetector} + * which will be replaced by UnifiedQueryParser and index settings when available. */ public static boolean isAnalyticsIndex(String query) { - if (query == null) { - return false; - } - String indexName = extractIndexName(query); - if (indexName == null) { - return false; - } - // Handle qualified names like "catalog.parquet_logs" — check the last segment - int lastDot = indexName.lastIndexOf('.'); - String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; - return tableName.startsWith("parquet_"); - } - - /** - * Extract the source index name from a PPL query string. - * - * @param query the PPL query string - * @return the index name, or null if not found - */ - static String extractIndexName(String query) { - Matcher matcher = SOURCE_PATTERN.matcher(query); - if (matcher.find()) { - return matcher.group(1); - } - return null; + return StubIndexDetector.isAnalyticsIndex(query); } /** @@ -126,8 +87,8 @@ private void doExecute(String query, QueryType queryType, RestChannel channel) { try { long startTime = System.nanoTime(); - // TODO: Replace stub schema with EngineContext.getSchema() when analytics engine is ready - AbstractSchema schema = buildStubSchema(); + // TODO: Replace with EngineContext.getSchema() when analytics engine is ready + AbstractSchema schema = StubSchemaProvider.buildSchema(); try (UnifiedQueryContext context = UnifiedQueryContext.builder() @@ -181,52 +142,6 @@ public void onFailure(Exception e) { }; } - /** - * Stub schema for development and testing. Returns a hardcoded table definition for any - * "parquet_*" table. Will be replaced by EngineContext.getSchema() when the analytics engine is - * ready. - */ - private static AbstractSchema buildStubSchema() { - return new AbstractSchema() { - @Override - protected Map getTableMap() { - return Map.of( - "parquet_logs", buildStubTable(), - "parquet_metrics", buildStubMetricsTable()); - } - }; - } - - private static Table buildStubTable() { - return new AbstractTable() { - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return typeFactory - .builder() - .add("ts", SqlTypeName.TIMESTAMP) - .add("status", SqlTypeName.INTEGER) - .add("message", SqlTypeName.VARCHAR) - .add("ip_addr", SqlTypeName.VARCHAR) - .build(); - } - }; - } - - private static Table buildStubMetricsTable() { - return new AbstractTable() { - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return typeFactory - .builder() - .add("ts", SqlTypeName.TIMESTAMP) - .add("cpu", SqlTypeName.DOUBLE) - .add("memory", SqlTypeName.DOUBLE) - .add("host", SqlTypeName.VARCHAR) - .build(); - } - }; - } - /** * Classify whether the exception is a client error (bad query) or server error (engine bug). * Matches the classification in {@link RestPPLQueryAction#isClientError}. @@ -257,7 +172,6 @@ private static void reportError(RestChannel channel, Exception e) { RestStatus status = isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR; String reason = e.getMessage() != null ? e.getMessage() : "Unknown error"; - // Escape characters that would break JSON reason = reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", ""); channel.sendResponse( diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java new file mode 100644 index 00000000000..b809adfbc9b --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest.analytics.stub; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Temporary index detection logic for routing queries to the analytics engine. Uses a regex to + * extract the index name and a "parquet_" prefix convention to identify analytics indices. + * + *

Will be replaced by {@code UnifiedQueryParser} for index extraction and index settings (e.g., + * {@code index.storage_type}) for detection when available. + */ +public class StubIndexDetector { + + /** + * Pattern to extract index name from PPL source clause. Matches: source = index, source=index, + * source = `index`, source = catalog.index + */ + private static final Pattern SOURCE_PATTERN = + Pattern.compile( + "source\\s*=\\s*`?([a-zA-Z0-9_.*]+(?:\\.[a-zA-Z0-9_.*]+)*)`?", Pattern.CASE_INSENSITIVE); + + /** + * Check if the query targets an analytics engine index (e.g., Parquet-backed). Currently uses a + * prefix convention ("parquet_"). In production, this will check index settings such as + * index.storage_type. + */ + public static boolean isAnalyticsIndex(String query) { + if (query == null) { + return false; + } + String indexName = extractIndexName(query); + if (indexName == null) { + return false; + } + int lastDot = indexName.lastIndexOf('.'); + String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + return tableName.startsWith("parquet_"); + } + + /** + * Extract the source index name from a PPL query string. + * + * @param query the PPL query string + * @return the index name, or null if not found + */ + static String extractIndexName(String query) { + Matcher matcher = SOURCE_PATTERN.matcher(query); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java similarity index 79% rename from plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java rename to plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java index fdd15eb7c17..598d0e13699 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/StubQueryPlanExecutor.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.plugin.rest; +package org.opensearch.sql.plugin.rest.analytics.stub; import java.time.Instant; import java.util.List; @@ -13,7 +13,7 @@ /** * Stub implementation of {@link QueryPlanExecutor} for development and testing. Returns canned data - * so the full pipeline (routing → planning → execution → response formatting) can be validated + * so the full pipeline (routing -> planning -> execution -> response formatting) can be validated * without the analytics engine. * *

Will be replaced by the real analytics engine implementation when available. @@ -22,9 +22,6 @@ public class StubQueryPlanExecutor implements QueryPlanExecutor { @Override public Iterable execute(RelNode plan, Object context) { - // Return canned rows matching the stub schema defined in RestUnifiedQueryAction. - // The column order must match the schema: ts, status, message, ip_addr - // (for parquet_logs table). For other tables, return empty results. String tableName = extractTableName(plan); if (tableName != null && tableName.contains("parquet_logs")) { return List.of( @@ -47,7 +44,6 @@ public Iterable execute(RelNode plan, Object context) { } private String extractTableName(RelNode plan) { - // Use RelOptUtil.toString to get the full plan tree including child nodes String planStr = RelOptUtil.toString(plan); if (planStr.contains("parquet_logs")) { return "parquet_logs"; diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java new file mode 100644 index 00000000000..d45a97f861f --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest.analytics.stub; + +import java.util.Map; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Stub schema provider for development and testing. Returns hardcoded Calcite table definitions + * with standard types. Will be replaced by {@code EngineContext.getSchema()} when the analytics + * engine is ready. + */ +public class StubSchemaProvider { + + /** Build a stub Calcite schema with hardcoded parquet tables. */ + public static AbstractSchema buildSchema() { + return new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of( + "parquet_logs", buildLogsTable(), + "parquet_metrics", buildMetricsTable()); + } + }; + } + + private static Table buildLogsTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("status", SqlTypeName.INTEGER) + .add("message", SqlTypeName.VARCHAR) + .add("ip_addr", SqlTypeName.VARCHAR) + .build(); + } + }; + } + + private static Table buildMetricsTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("cpu", SqlTypeName.DOUBLE) + .add("memory", SqlTypeName.DOUBLE) + .add("host", SqlTypeName.VARCHAR) + .build(); + } + }; + } +} From 09fb0c49ffe6ee4a9bac60190cf238e1a6fdca53 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 14:36:50 -0700 Subject: [PATCH 04/11] Use ErrorMessageFactory for error responses Replace hand-crafted JSON error response with ErrorMessageFactory.createErrorMessage(), matching the standard error format used in RestPPLQueryAction.reportError(). Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../sql/plugin/rest/RestUnifiedQueryAction.java | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index a15be76891d..b32c2876ba8 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -34,6 +34,7 @@ import org.opensearch.sql.executor.analytics.QueryPlanExecutor; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; import org.opensearch.sql.plugin.rest.analytics.stub.StubIndexDetector; import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider; import org.opensearch.sql.protocol.response.QueryResult; @@ -171,19 +172,8 @@ private static void recordFailureMetric(Exception e) { private static void reportError(RestChannel channel, Exception e) { RestStatus status = isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR; - String reason = e.getMessage() != null ? e.getMessage() : "Unknown error"; - reason = - reason.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", ""); channel.sendResponse( new BytesRestResponse( - status, - "application/json; charset=UTF-8", - "{\"error\":{\"type\":\"" - + e.getClass().getSimpleName() - + "\",\"reason\":\"" - + reason - + "\"},\"status\":" - + status.getStatus() - + "}")); + status, ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString())); } } From 451405f2a9be006aa03868463c513d71fb1c8005 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 14:44:29 -0700 Subject: [PATCH 05/11] Make metrics and response formatting query-type-aware Use QueryType to select the correct metrics (PPL_REQ_TOTAL vs REQ_TOTAL, PPL_FAILED_REQ_COUNT_* vs FAILED_REQ_COUNT_*) and LangSpec (PPL_SPEC vs SQL_SPEC) so this class can serve both PPL and SQL queries when unified SQL support is added. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../plugin/rest/RestUnifiedQueryAction.java | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index b32c2876ba8..8acb5d39278 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -32,6 +32,7 @@ import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; import org.opensearch.sql.executor.analytics.QueryPlanExecutor; +import org.opensearch.sql.lang.LangSpec; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; @@ -107,16 +108,17 @@ private void doExecute(String query, QueryType queryType, RestChannel channel) { queryType); CalcitePlanContext planContext = context.getPlanContext(); - analyticsEngine.execute(plan, planContext, createQueryListener(channel, planTime)); + analyticsEngine.execute( + plan, planContext, createQueryListener(queryType, channel, planTime)); } } catch (Exception e) { - recordFailureMetric(e); + recordFailureMetric(queryType, e); reportError(channel, e); } } private ResponseListener createQueryListener( - RestChannel channel, long planEndTime) { + QueryType queryType, RestChannel channel, long planEndTime) { ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); return new ResponseListener() { @Override @@ -126,18 +128,18 @@ public void onResponse(QueryResponse response) { "[unified] Execution completed in {}ms, {} rows returned", (execTime - planEndTime) / 1_000_000, response.getResults().size()); - Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); - Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); + recordSuccessMetric(queryType); + LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC; String result = formatter.format( new QueryResult( - response.getSchema(), response.getResults(), response.getCursor(), PPL_SPEC)); + response.getSchema(), response.getResults(), response.getCursor(), langSpec)); channel.sendResponse(new BytesRestResponse(OK, formatter.contentType(), result)); } @Override public void onFailure(Exception e) { - recordFailureMetric(e); + recordFailureMetric(queryType, e); reportError(channel, e); } }; @@ -159,13 +161,33 @@ private static boolean isClientError(Exception e) { || e instanceof IllegalAccessException; } - private static void recordFailureMetric(Exception e) { + private static void recordSuccessMetric(QueryType queryType) { + if (queryType == QueryType.PPL) { + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); + } else { + Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment(); + } + } + + private static void recordFailureMetric(QueryType queryType, Exception e) { if (isClientError(e)) { LOG.warn("[unified] Client error in query execution", e); - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); + Metrics.getInstance() + .getNumericalMetric( + queryType == QueryType.PPL + ? MetricName.PPL_FAILED_REQ_COUNT_CUS + : MetricName.FAILED_REQ_COUNT_CUS) + .increment(); } else { LOG.error("[unified] Server error in query execution", e); - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment(); + Metrics.getInstance() + .getNumericalMetric( + queryType == QueryType.PPL + ? MetricName.PPL_FAILED_REQ_COUNT_SYS + : MetricName.FAILED_REQ_COUNT_SYS) + .increment(); } } From 7613c44f83bc1e4d044296427d4dd56fc6ed8684 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 14:57:26 -0700 Subject: [PATCH 06/11] Move analytics routing from REST layer to transport layer Move the analytics index routing check from RestPPLQueryAction into TransportPPLQueryAction.doExecute(). This ensures the analytics path gets the same PPL enabled check, metrics, request ID, and inter-plugin transport support as the existing Lucene path. RestPPLQueryAction and SQLPlugin are reverted to their original state. Added executeViaTransport() to RestUnifiedQueryAction which returns results via ActionListener instead of RestChannel, integrating properly with the transport action pattern. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../org/opensearch/sql/plugin/SQLPlugin.java | 2 +- .../sql/plugin/rest/RestPPLQueryAction.java | 16 +--- .../plugin/rest/RestUnifiedQueryAction.java | 87 +++++++++++++++++++ .../transport/TransportPPLQueryAction.java | 16 +++- 4 files changed, 104 insertions(+), 17 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 016d53dc8de..edffd65f6bf 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -163,7 +163,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( - new RestPPLQueryAction(clusterService, this.client), + new RestPPLQueryAction(), new RestPPLGrammarAction(), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index 527eb245909..ffdd90504f7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -17,7 +17,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexNotFoundException; @@ -30,7 +29,6 @@ import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; -import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; @@ -46,15 +44,9 @@ public class RestPPLQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(); - /** Unified query handler for Parquet-backed indices (Analytics engine path). */ - private final RestUnifiedQueryAction unifiedQueryHandler; - /** Constructor of RestPPLQueryAction. */ - public RestPPLQueryAction(ClusterService clusterService, NodeClient client) { + public RestPPLQueryAction() { super(); - this.unifiedQueryHandler = - new RestUnifiedQueryAction( - client, new org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor()); } private static boolean isClientError(Exception e) { @@ -94,12 +86,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request)); - // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices - String pplQuery = transportPPLQueryRequest.toPPLQueryRequest().getRequest(); - if (RestUnifiedQueryAction.isAnalyticsIndex(pplQuery)) { - return channel -> unifiedQueryHandler.execute(pplQuery, QueryType.PPL, channel); - } - return channel -> nodeClient.execute( PPLQueryAction.INSTANCE, diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 8acb5d39278..010faa76a86 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -16,6 +16,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexNotFoundException; import org.opensearch.rest.BytesRestResponse; @@ -38,6 +39,8 @@ import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; import org.opensearch.sql.plugin.rest.analytics.stub.StubIndexDetector; import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; import org.opensearch.sql.protocol.response.format.ResponseFormatter; @@ -85,6 +88,90 @@ public void execute(String query, QueryType queryType, RestChannel channel) { SQL_WORKER_THREAD_POOL_NAME); } + /** + * Execute a query through the unified query pipeline, returning the result via transport action + * listener. Called from {@code TransportPPLQueryAction} for proper PPL enabled check, metrics, + * and request ID handling. + * + * @param query the PPL query string + * @param queryType SQL or PPL + * @param pplRequest the original PPL request (for format selection) + * @param listener the transport action listener + */ + public void executeViaTransport( + String query, + QueryType queryType, + PPLQueryRequest pplRequest, + ActionListener listener) { + client + .threadPool() + .schedule( + () -> doExecuteViaTransport(query, queryType, pplRequest, listener), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + private void doExecuteViaTransport( + String query, + QueryType queryType, + PPLQueryRequest pplRequest, + ActionListener listener) { + try { + long startTime = System.nanoTime(); + AbstractSchema schema = StubSchemaProvider.buildSchema(); + + try (UnifiedQueryContext context = + UnifiedQueryContext.builder() + .language(queryType) + .catalog(SCHEMA_NAME, schema) + .defaultNamespace(SCHEMA_NAME) + .build()) { + + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + long planTime = System.nanoTime(); + LOG.info( + "[unified] Planning completed in {}ms for {} query", + (planTime - startTime) / 1_000_000, + queryType); + + CalcitePlanContext planContext = context.getPlanContext(); + analyticsEngine.execute( + plan, planContext, createTransportQueryListener(queryType, planTime, listener)); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + + private ResponseListener createTransportQueryListener( + QueryType queryType, + long planEndTime, + ActionListener transportListener) { + ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + long execTime = System.nanoTime(); + LOG.info( + "[unified] Execution completed in {}ms, {} rows returned", + (execTime - planEndTime) / 1_000_000, + response.getResults().size()); + LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC; + String result = + formatter.format( + new QueryResult( + response.getSchema(), response.getResults(), response.getCursor(), langSpec)); + transportListener.onResponse(new TransportPPLQueryResponse(result)); + } + + @Override + public void onFailure(Exception e) { + transportListener.onFailure(e); + } + }; + } + private void doExecute(String query, QueryType queryType, RestChannel channel) { try { long startTime = System.nanoTime(); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 48bc36374a8..60ba183d99b 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -28,11 +28,14 @@ import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; +import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction; +import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; @@ -56,6 +59,8 @@ public class TransportPPLQueryAction private final Supplier pplEnabled; + private final RestUnifiedQueryAction unifiedQueryHandler; + /** Constructor of TransportPPLQueryAction. */ @Inject public TransportPPLQueryAction( @@ -77,6 +82,7 @@ public TransportPPLQueryAction( b.bind(DataSourceService.class).toInstance(dataSourceService); }); this.injector = Guice.createInjector(modules); + this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor()); this.pplEnabled = () -> MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings) @@ -114,9 +120,17 @@ protected void doExecute( QueryContext.addRequestId(); - PPLService pplService = injector.getInstance(PPLService.class); // in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest(); + + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices + if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) { + unifiedQueryHandler.executeViaTransport( + transformedRequest.getRequest(), QueryType.PPL, transformedRequest, listener); + return; + } + + PPLService pplService = injector.getInstance(PPLService.class); QueryContext.setProfile(transformedRequest.profile()); ActionListener clearingListener = wrapWithProfilingClear(listener); From 6ce3ac568421226848d14ee6914d1fac68af238a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 25 Mar 2026 15:13:42 -0700 Subject: [PATCH 07/11] Add query size limit to RelNode plan instead of post-processing Add LogicalSystemLimit to the RelNode plan before passing it to the analytics engine, consistent with PPL V3 (QueryService.convertToCalcitePlan). This ensures the analytics engine enforces the limit during execution rather than returning all rows for post-processing truncation. Remove post-processing querySizeLimit truncation from AnalyticsExecutionEngine -- the limit is now part of the plan the executor receives. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../analytics/AnalyticsExecutionEngine.java | 9 ++----- .../AnalyticsExecutionEngineTest.java | 22 ++--------------- .../plugin/rest/RestUnifiedQueryAction.java | 24 +++++++++++++++++-- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index 4159fa4845d..67be7142463 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -65,11 +65,10 @@ public void explain(PhysicalPlan plan, ResponseListener listene public void execute( RelNode plan, CalcitePlanContext context, ResponseListener listener) { try { - Integer querySizeLimit = context.sysLimit.querySizeLimit(); Iterable rows = planExecutor.execute(plan, null); List fields = plan.getRowType().getFieldList(); - List results = convertRows(rows, fields, querySizeLimit); + List results = convertRows(rows, fields); Schema schema = buildSchema(fields); listener.onResponse(new QueryResponse(schema, results, Cursor.None)); @@ -78,13 +77,9 @@ public void execute( } } - private List convertRows( - Iterable rows, List fields, Integer querySizeLimit) { + private List convertRows(Iterable rows, List fields) { List results = new ArrayList<>(); for (Object[] row : rows) { - if (querySizeLimit != null && results.size() >= querySizeLimit) { - break; - } Map valueMap = new LinkedHashMap<>(); for (int i = 0; i < fields.size(); i++) { String columnName = fields.get(i).getName(); diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java index 04e65b1d383..70ada76c54a 100644 --- a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -12,10 +12,8 @@ import static org.mockito.Mockito.when; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; @@ -148,24 +146,8 @@ void executeRelNode_temporalTypes() { assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump); } - @Test - void executeRelNode_querySizeLimit() throws Exception { - RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); - List manyRows = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - manyRows.add(new Object[] {i}); - } - when(mockExecutor.execute(relNode, null)).thenReturn(manyRows); - setSysLimit(mockContext, new SysLimit(10, 10000, 50000)); - - QueryResponse response = executeAndCapture(relNode); - String dump = dumpResponse(response); - - assertEquals( - 10, - response.getResults().size(), - "Should truncate to querySizeLimit=10, got " + response.getResults().size() + ". " + dump); - } + // Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches + // AnalyticsExecutionEngine. The engine trusts the executor to honor the limit. @Test void executeRelNode_emptyResults() { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 010faa76a86..829adf2c14d 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -24,6 +24,7 @@ import org.opensearch.sql.api.UnifiedQueryContext; import org.opensearch.sql.api.UnifiedQueryPlanner; import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.datasources.exceptions.DataSourceClientException; @@ -129,13 +130,18 @@ private void doExecuteViaTransport( UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); RelNode plan = planner.plan(query); + + // Add query size limit to the plan so the analytics engine can enforce it + // during execution, consistent with PPL V3 (see QueryService.convertToCalcitePlan) + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + long planTime = System.nanoTime(); LOG.info( "[unified] Planning completed in {}ms for {} query", (planTime - startTime) / 1_000_000, queryType); - CalcitePlanContext planContext = context.getPlanContext(); analyticsEngine.execute( plan, planContext, createTransportQueryListener(queryType, planTime, listener)); } @@ -144,6 +150,17 @@ private void doExecuteViaTransport( } } + /** + * Add a system-level query size limit to the plan. This ensures the analytics engine enforces the + * limit during execution rather than returning all rows for post-processing truncation. + */ + private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) { + return LogicalSystemLimit.create( + LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT, + plan, + context.relBuilder.literal(context.sysLimit.querySizeLimit())); + } + private ResponseListener createTransportQueryListener( QueryType queryType, long planEndTime, @@ -188,13 +205,16 @@ private void doExecute(String query, QueryType queryType, RestChannel channel) { UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); RelNode plan = planner.plan(query); + + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + long planTime = System.nanoTime(); LOG.info( "[unified] Planning completed in {}ms for {} query", (planTime - startTime) / 1_000_000, queryType); - CalcitePlanContext planContext = context.getPlanContext(); analyticsEngine.execute( plan, planContext, createQueryListener(queryType, channel, planTime)); } From 0ea384ab296f6ffc857436285e087a5f2c326333 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Thu, 26 Mar 2026 14:00:07 -0700 Subject: [PATCH 08/11] Propagate security context to sql-worker thread pool Wrap scheduled lambdas in both execute() and executeViaTransport() with withCurrentContext() to capture and restore ThreadContext (user identity, permissions, audit trail) on the worker thread. Follows the same pattern as OpenSearchQueryManager.withCurrentContext(). Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../plugin/rest/RestUnifiedQueryAction.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 829adf2c14d..800e8fcbc5f 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -11,10 +11,12 @@ import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; +import java.util.Map; import org.apache.calcite.rel.RelNode; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; @@ -84,7 +86,7 @@ public void execute(String query, QueryType queryType, RestChannel channel) { client .threadPool() .schedule( - () -> doExecute(query, queryType, channel), + withCurrentContext(() -> doExecute(query, queryType, channel)), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } @@ -107,7 +109,7 @@ public void executeViaTransport( client .threadPool() .schedule( - () -> doExecuteViaTransport(query, queryType, pplRequest, listener), + withCurrentContext(() -> doExecuteViaTransport(query, queryType, pplRequest, listener)), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } @@ -298,6 +300,19 @@ private static void recordFailureMetric(QueryType queryType, Exception e) { } } + /** + * Capture current thread context and restore it on the worker thread. Ensures security context + * (user identity, permissions) is propagated. Same pattern as {@link + * org.opensearch.sql.opensearch.executor.OpenSearchQueryManager#withCurrentContext}. + */ + private static Runnable withCurrentContext(final Runnable task) { + final Map currentContext = ThreadContext.getImmutableContext(); + return () -> { + ThreadContext.putAll(currentContext); + task.run(); + }; + } + private static void reportError(RestChannel channel, Exception e) { RestStatus status = isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR; From e08ef54a49ba5a8b5ab21645820de381d04563d4 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Thu, 26 Mar 2026 14:02:05 -0700 Subject: [PATCH 09/11] Move analytics routing after profiling setup Move the analytics index routing check after QueryContext.setProfile() and wrapWithProfilingClear(listener). Use clearingListener instead of raw listener so profiling thread-local state is properly cleaned up after analytics path execution. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../sql/plugin/transport/TransportPPLQueryAction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 60ba183d99b..f0c0d6c0121 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -122,17 +122,17 @@ protected void doExecute( // in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest(); + QueryContext.setProfile(transformedRequest.profile()); + ActionListener clearingListener = wrapWithProfilingClear(listener); // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) { unifiedQueryHandler.executeViaTransport( - transformedRequest.getRequest(), QueryType.PPL, transformedRequest, listener); + transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener); return; } PPLService pplService = injector.getInstance(PPLService.class); - QueryContext.setProfile(transformedRequest.profile()); - ActionListener clearingListener = wrapWithProfilingClear(listener); if (transformedRequest.isExplainRequest()) { pplService.explain( From 70f7e65183ea2e19611bfb471c55a65b73463eea Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Thu, 26 Mar 2026 14:17:27 -0700 Subject: [PATCH 10/11] Remove NPE from isClientError classification NPE in the analytics path is a server bug (null schema field, missing row), not bad user input. Removed from client error list. Will sync this classification with RestPPLQueryAction updates in #5266. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../sql/plugin/rest/RestUnifiedQueryAction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 800e8fcbc5f..89bde71df9a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -255,12 +255,12 @@ public void onFailure(Exception e) { } /** - * Classify whether the exception is a client error (bad query) or server error (engine bug). - * Matches the classification in {@link RestPPLQueryAction#isClientError}. + * Classify whether the exception is a client error (bad query) or server error (engine bug). Will + * be synced with RestPPLQueryAction#isClientError updates in + * https://github.com/opensearch-project/sql/pull/5266. */ private static boolean isClientError(Exception e) { - return e instanceof NullPointerException - || e instanceof IllegalArgumentException + return e instanceof IllegalArgumentException || e instanceof IndexNotFoundException || e instanceof SemanticCheckException || e instanceof ExpressionEvaluationException From f96391188838649e3a908c52b7068929329988fe Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Thu, 26 Mar 2026 15:22:28 -0700 Subject: [PATCH 11/11] Remove duplicate REST execution path from RestUnifiedQueryAction Remove execute(query, queryType, channel), doExecute(), createQueryListener(channel), recordSuccessMetric(), recordFailureMetric(), reportError(), and related REST imports. Since routing now goes through TransportPPLQueryAction, the REST-specific path was unused. Renamed executeViaTransport() to execute() as the sole entry point. Signed-off-by: Kai Huang Signed-off-by: Kai Huang --- .../plugin/rest/RestUnifiedQueryAction.java | 168 ++---------------- .../transport/TransportPPLQueryAction.java | 2 +- 2 files changed, 12 insertions(+), 158 deletions(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index 89bde71df9a..460ad390a55 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -5,8 +5,6 @@ package org.opensearch.sql.plugin.rest; -import static org.opensearch.core.rest.RestStatus.OK; -import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC; import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; @@ -19,27 +17,16 @@ import org.apache.logging.log4j.ThreadContext; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.index.IndexNotFoundException; -import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; import org.opensearch.sql.api.UnifiedQueryContext; import org.opensearch.sql.api.UnifiedQueryPlanner; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; -import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.datasources.exceptions.DataSourceClientException; -import org.opensearch.sql.exception.ExpressionEvaluationException; -import org.opensearch.sql.exception.QueryEngineException; -import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; import org.opensearch.sql.executor.analytics.QueryPlanExecutor; import org.opensearch.sql.lang.LangSpec; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.metrics.Metrics; -import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; import org.opensearch.sql.plugin.rest.analytics.stub.StubIndexDetector; import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; @@ -76,32 +63,16 @@ public static boolean isAnalyticsIndex(String query) { } /** - * Execute a query through the unified query pipeline on the sql-worker thread pool. + * Execute a query through the unified query pipeline on the sql-worker thread pool. Called from + * {@code TransportPPLQueryAction} which handles PPL enabled check, metrics, request ID, and + * profiling cleanup. * - * @param query the PPL query string + * @param query the query string * @param queryType SQL or PPL - * @param channel the REST channel for sending the response - */ - public void execute(String query, QueryType queryType, RestChannel channel) { - client - .threadPool() - .schedule( - withCurrentContext(() -> doExecute(query, queryType, channel)), - new TimeValue(0), - SQL_WORKER_THREAD_POOL_NAME); - } - - /** - * Execute a query through the unified query pipeline, returning the result via transport action - * listener. Called from {@code TransportPPLQueryAction} for proper PPL enabled check, metrics, - * and request ID handling. - * - * @param query the PPL query string - * @param queryType SQL or PPL - * @param pplRequest the original PPL request (for format selection) + * @param pplRequest the original PPL request * @param listener the transport action listener */ - public void executeViaTransport( + public void execute( String query, QueryType queryType, PPLQueryRequest pplRequest, @@ -109,12 +80,12 @@ public void executeViaTransport( client .threadPool() .schedule( - withCurrentContext(() -> doExecuteViaTransport(query, queryType, pplRequest, listener)), + withCurrentContext(() -> doExecute(query, queryType, pplRequest, listener)), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } - private void doExecuteViaTransport( + private void doExecute( String query, QueryType queryType, PPLQueryRequest pplRequest, @@ -145,7 +116,7 @@ private void doExecuteViaTransport( queryType); analyticsEngine.execute( - plan, planContext, createTransportQueryListener(queryType, planTime, listener)); + plan, planContext, createQueryListener(queryType, planTime, listener)); } } catch (Exception e) { listener.onFailure(e); @@ -163,7 +134,7 @@ private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext contex context.relBuilder.literal(context.sysLimit.querySizeLimit())); } - private ResponseListener createTransportQueryListener( + private ResponseListener createQueryListener( QueryType queryType, long planEndTime, ActionListener transportListener) { @@ -191,115 +162,6 @@ public void onFailure(Exception e) { }; } - private void doExecute(String query, QueryType queryType, RestChannel channel) { - try { - long startTime = System.nanoTime(); - - // TODO: Replace with EngineContext.getSchema() when analytics engine is ready - AbstractSchema schema = StubSchemaProvider.buildSchema(); - - try (UnifiedQueryContext context = - UnifiedQueryContext.builder() - .language(queryType) - .catalog(SCHEMA_NAME, schema) - .defaultNamespace(SCHEMA_NAME) - .build()) { - - UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); - RelNode plan = planner.plan(query); - - CalcitePlanContext planContext = context.getPlanContext(); - plan = addQuerySizeLimit(plan, planContext); - - long planTime = System.nanoTime(); - LOG.info( - "[unified] Planning completed in {}ms for {} query", - (planTime - startTime) / 1_000_000, - queryType); - - analyticsEngine.execute( - plan, planContext, createQueryListener(queryType, channel, planTime)); - } - } catch (Exception e) { - recordFailureMetric(queryType, e); - reportError(channel, e); - } - } - - private ResponseListener createQueryListener( - QueryType queryType, RestChannel channel, long planEndTime) { - ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); - return new ResponseListener() { - @Override - public void onResponse(QueryResponse response) { - long execTime = System.nanoTime(); - LOG.info( - "[unified] Execution completed in {}ms, {} rows returned", - (execTime - planEndTime) / 1_000_000, - response.getResults().size()); - recordSuccessMetric(queryType); - LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC; - String result = - formatter.format( - new QueryResult( - response.getSchema(), response.getResults(), response.getCursor(), langSpec)); - channel.sendResponse(new BytesRestResponse(OK, formatter.contentType(), result)); - } - - @Override - public void onFailure(Exception e) { - recordFailureMetric(queryType, e); - reportError(channel, e); - } - }; - } - - /** - * Classify whether the exception is a client error (bad query) or server error (engine bug). Will - * be synced with RestPPLQueryAction#isClientError updates in - * https://github.com/opensearch-project/sql/pull/5266. - */ - private static boolean isClientError(Exception e) { - return e instanceof IllegalArgumentException - || e instanceof IndexNotFoundException - || e instanceof SemanticCheckException - || e instanceof ExpressionEvaluationException - || e instanceof QueryEngineException - || e instanceof SyntaxCheckException - || e instanceof DataSourceClientException - || e instanceof IllegalAccessException; - } - - private static void recordSuccessMetric(QueryType queryType) { - if (queryType == QueryType.PPL) { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); - Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); - } else { - Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment(); - Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment(); - } - } - - private static void recordFailureMetric(QueryType queryType, Exception e) { - if (isClientError(e)) { - LOG.warn("[unified] Client error in query execution", e); - Metrics.getInstance() - .getNumericalMetric( - queryType == QueryType.PPL - ? MetricName.PPL_FAILED_REQ_COUNT_CUS - : MetricName.FAILED_REQ_COUNT_CUS) - .increment(); - } else { - LOG.error("[unified] Server error in query execution", e); - Metrics.getInstance() - .getNumericalMetric( - queryType == QueryType.PPL - ? MetricName.PPL_FAILED_REQ_COUNT_SYS - : MetricName.FAILED_REQ_COUNT_SYS) - .increment(); - } - } - /** * Capture current thread context and restore it on the worker thread. Ensures security context * (user identity, permissions) is propagated. Same pattern as {@link @@ -312,12 +174,4 @@ private static Runnable withCurrentContext(final Runnable task) { task.run(); }; } - - private static void reportError(RestChannel channel, Exception e) { - RestStatus status = - isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR; - channel.sendResponse( - new BytesRestResponse( - status, ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString())); - } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index f0c0d6c0121..db57e0d84d2 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -127,7 +127,7 @@ protected void doExecute( // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) { - unifiedQueryHandler.executeViaTransport( + unifiedQueryHandler.execute( transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener); return; }