Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public void explain(
ResponseListener<ExplainResponse> listener) {
try {
String logical = RelOptUtil.toString(plan, mode.toExplainLevel());
listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)));
ExplainResponse response =
new ExplainResponse(new ExplainResponseNodeV2(logical, null, null));
listener.onResponse(ExplainResponse.normalizeLf(response));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.sql;

import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId;

import com.google.common.io.Resources;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.sql.legacy.SQLIntegTestCase;

/**
* Explain integration tests for SQL queries routed through the analytics engine path (Project
* Analytics engine). Validates that SQL queries targeting "parquet_*" indices produce correct
* logical plans via the _plugins/_sql/_explain endpoint.
*
* <p>Expected output files are in resources/expectedOutput/analytics_sql/. Each test compares the
* explain JSON output against its expected file.
*/
@SuppressWarnings("deprecation") // assertJsonEqualsIgnoreId is correct for JSON explain response
public class AnalyticsSQLExplainIT extends SQLIntegTestCase {

@Override
protected void init() throws Exception {
if (!isIndexExist(client(), "parquet_logs")) {
Request request = new Request("PUT", "/parquet_logs");
request.setJsonEntity(
"{"
+ "\"mappings\": {"
+ " \"properties\": {"
+ " \"ts\": {\"type\": \"date\"},"
+ " \"status\": {\"type\": \"integer\"},"
+ " \"message\": {\"type\": \"keyword\"},"
+ " \"ip_addr\": {\"type\": \"keyword\"}"
+ " }"
+ "}"
+ "}");
client().performRequest(request);
}
}

private static String loadExpectedJson(String fileName) {
return loadFromFile("expectedOutput/analytics_sql/" + fileName);
}

private static String loadFromFile(String filename) {
try {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Test
public void testExplainSelectStar() throws IOException {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also check EXPLAIN statement? It's supported by Calcite SQL, e.g., EXPLAIN PLAN FOR ... Ref: https://calcite.apache.org/docs/reference.html

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated — EXPLAIN PLAN FOR is parsed by Calcite as a SqlExplain node, but SqlToRelConverter.convertQueryRecursive() doesn't handle it (throws AssertionError("not a query: ...") in the default case). Calcite treats EXPLAIN as a meta-statement handled at the JDBC layer (CalcitePrepareImpl), not at the SqlToRelConverter level.

To support it, we'd need to detect SqlExplain in CalciteNativeStrategy.plan(), unwrap the inner query, plan it, and return the plan as a formatted result. However, the resulting plan would be identical to what
/_plugins/_sql/_explain already returns for the same query — the only difference is the response format (EXPLAIN PLAN FOR returns the plan as a query result row, /_explain returns it as a JSON explain response)

Do we want to support EXPLAIN statement?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error log:

curl -s -X POST localhost:9200/_plugins/_sql -H 'Content-Type: application/json' -d '{"query": "EXPLAIN PLAN FOR SELECT * FROM parquet_logs"}'
{
  "error": {
    "reason": "Invalid SQL query",
    "details": "Query must start with SELECT, DELETE, SHOW or DESCRIBE: EXPLAIN PLAN FOR SELECT * FROM parquet_logs",
    "type": "SQLFeatureNotSupportedException"
  },
  "status": 400
}%                      

assertJsonEqualsIgnoreId(
loadExpectedJson("explain_select_star.json"), explainQuery("SELECT * FROM parquet_logs"));
}

@Test
public void testExplainSelectColumns() throws IOException {
assertJsonEqualsIgnoreId(
loadExpectedJson("explain_select_columns.json"),
explainQuery("SELECT ts, status FROM parquet_logs"));
}

@Test
public void testExplainSelectWithWhere() throws IOException {
assertJsonEqualsIgnoreId(
loadExpectedJson("explain_select_where.json"),
explainQuery("SELECT ts, message FROM parquet_logs WHERE status = 200"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.sql;

import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import java.io.IOException;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.legacy.SQLIntegTestCase;

/**
* Integration tests for SQL queries routed through the analytics engine path. Queries targeting
* "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses {@code
* AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}.
*
* <p>The stub executor returns rows in a fixed order [ts, status, message, ip_addr] regardless of
* the plan. The schema from OpenSearchSchemaBuilder is alphabetical [ip_addr, message, status, ts].
* AnalyticsExecutionEngine maps values by position, so the data values appear mismatched. This is
* expected; the real analytics engine will evaluate the plan correctly.
*/
public class AnalyticsSQLIT extends SQLIntegTestCase {

@Override
protected void init() throws Exception {
createParquetLogsIndex();
}

private void createParquetLogsIndex() throws IOException {
if (isIndexExist(client(), "parquet_logs")) {
return;
}
Request request = new Request("PUT", "/parquet_logs");
request.setJsonEntity(
"{"
+ "\"mappings\": {"
+ " \"properties\": {"
+ " \"ts\": {\"type\": \"date\"},"
+ " \"status\": {\"type\": \"integer\"},"
+ " \"message\": {\"type\": \"keyword\"},"
+ " \"ip_addr\": {\"type\": \"keyword\"}"
+ " }"
+ "}"
+ "}");
client().performRequest(request);
}

@Test
public void testSelectStarSchemaAndData() throws IOException {
JSONObject result = executeQuery("SELECT * FROM parquet_logs");
verifySchema(
result,
schema("ip_addr", "string"),
schema("message", "string"),
schema("status", "integer"),
schema("ts", "timestamp"));
// Stub returns [ts, status, message, ip_addr] per row, mapped by position to
// [ip_addr, message, status, ts] schema. Values appear mismatched — expected with stub.
verifyDataRows(
result,
rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"),
rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"),
rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3"));
}

@Test
public void testSelectSpecificColumns() throws IOException {
JSONObject result = executeQuery("SELECT status, message FROM parquet_logs");
verifySchema(result, schema("status", "integer"), schema("message", "string"));
verifyDataRows(
result,
rows("2024-01-15 10:30:00", 200),
rows("2024-01-15 10:31:00", 200),
rows("2024-01-15 10:32:00", 500));
}

@Test(expected = ResponseException.class)
public void testSyntaxError() throws IOException {
executeQuery("SELEC * FROM parquet_logs");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], status=[$2])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ip_addr=[$0], message=[$1], status=[$2], ts=[$3])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], message=[$1])\n LogicalFilter(condition=[=($2, 200)])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -83,10 +84,21 @@ public class RestSqlAction extends BaseRestHandler {
/** New SQL query request handler. */
private final RestSQLQueryAction newSqlQueryHandler;

public RestSqlAction(Settings settings, Injector injector) {
/**
* Analytics router. Called before the normal SQL engine. Accepts the request and channel, returns
* {@code true} if it handled the request (analytics index), {@code false} to fall through to
* normal SQL engine.
*/
private final BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter;

public RestSqlAction(
Settings settings,
Injector injector,
BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter) {
super();
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
this.newSqlQueryHandler = new RestSQLQueryAction(injector);
this.analyticsRouter = analyticsRouter;
}

@Override
Expand Down Expand Up @@ -134,39 +146,65 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

Format format = SqlRequestParam.getFormat(request.params());

// Route request to new query engine if it's supported already
SQLQueryRequest newSqlRequest =
new SQLQueryRequest(
sqlRequest.getJsonContent(),
sqlRequest.getSql(),
request.path(),
request.params(),
sqlRequest.cursor());
return newSqlQueryHandler.prepareRequest(
newSqlRequest,
(restChannel, exception) -> {
try {
if (newSqlRequest.isExplainRequest()) {
LOG.info(
"Request is falling back to old SQL engine due to: " + exception.getMessage());
}
LOG.info(
"[{}] Request {} is not supported and falling back to old SQL engine",
QueryContext.getRequestId(),
newSqlRequest);
LOG.info("Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));
QueryAction queryAction = explainRequest(client, sqlRequest, format);
executeSqlRequest(request, queryAction, client, restChannel);
} catch (Exception e) {
handleException(restChannel, e);
}
},
this::handleException);

// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
// The router returns true and sends the response directly if it handled the request.
final SQLQueryRequest finalRequest = newSqlRequest;
return channel -> {
if (!analyticsRouter.apply(finalRequest, channel)) {
delegateToV2Engine(request, client, sqlRequest, finalRequest, format, channel);
}
};
} catch (Exception e) {
return channel -> handleException(channel, e);
}
}

/** Delegate a SQL query to the V2 engine with legacy fallback. */
private void delegateToV2Engine(
RestRequest request,
NodeClient client,
SqlRequest sqlRequest,
SQLQueryRequest sqlQueryRequest,
Format format,
RestChannel channel) {
try {
newSqlQueryHandler
.prepareRequest(
sqlQueryRequest,
(restChannel, exception) -> {
try {
if (sqlQueryRequest.isExplainRequest()) {
LOG.info(
"Request is falling back to old SQL engine due to: "
+ exception.getMessage());
}
LOG.info(
"[{}] Request {} is not supported and falling back to old SQL engine",
QueryContext.getRequestId(),
sqlQueryRequest);
LOG.info(
"Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));
QueryAction queryAction = explainRequest(client, sqlRequest, format);
executeSqlRequest(request, queryAction, client, restChannel);
} catch (Exception e) {
handleException(restChannel, e);
}
},
this::handleException)
.accept(channel);
} catch (Exception e) {
handleException(channel, e);
}
}

private void handleException(RestChannel restChannel, Exception exception) {
logAndPublishMetrics(exception);
if (exception instanceof OpenSearchException) {
Expand Down
1 change: 1 addition & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ def getJobSchedulerPlugin() {

testClusters.integTest {
plugin(getJobSchedulerPlugin())
plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) }
plugin(project.tasks.bundlePlugin.archiveFile)
testDistribution = "ARCHIVE"

Expand Down
Loading
Loading