Skip to content

Commit 6b7fbb6

Browse files
authored
Merge pull request #997 from data-integrations/pushdown-metrics
Added support for context metrics in the BigQuery SQL Engine
2 parents e810f5a + df775f1 commit 6b7fbb6

5 files changed

Lines changed: 54 additions & 240 deletions

File tree

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java

Lines changed: 0 additions & 221 deletions
This file was deleted.

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import com.google.common.annotations.VisibleForTesting;
2828
import com.google.common.base.Strings;
2929
import io.cdap.cdap.api.RuntimeContext;
30+
import io.cdap.cdap.api.SQLEngineContext;
3031
import io.cdap.cdap.api.annotation.Description;
3132
import io.cdap.cdap.api.annotation.Metadata;
3233
import io.cdap.cdap.api.annotation.MetadataProperty;
3334
import io.cdap.cdap.api.annotation.Name;
3435
import io.cdap.cdap.api.annotation.Plugin;
3536
import io.cdap.cdap.api.data.format.StructuredRecord;
3637
import io.cdap.cdap.api.data.schema.Schema;
38+
import io.cdap.cdap.api.metrics.Metrics;
3739
import io.cdap.cdap.etl.api.PipelineConfigurer;
3840
import io.cdap.cdap.etl.api.connector.Connector;
3941
import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine;
@@ -115,8 +117,8 @@ public class BigQuerySQLEngine
115117
private String dataset;
116118
private String bucket;
117119
private String runId;
118-
private Map<String, String> tableNames;
119120
private Map<String, BigQuerySQLDataset> datasets;
121+
private Metrics metrics;
120122

121123
@SuppressWarnings("unused")
122124
public BigQuerySQLEngine(BigQuerySQLEngineConfig sqlEngineConfig) {
@@ -132,14 +134,13 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
132134
}
133135

134136
@Override
135-
public void prepareRun(RuntimeContext context) throws Exception {
137+
public void prepareRun(SQLEngineContext context) throws Exception {
136138
super.prepareRun(context);
137139

138140
// Validate configuration and throw exception if the supplied configuration is invalid.
139141
sqlEngineConfig.validate();
140142

141143
runId = BigQuerySQLEngineUtils.newIdentifier();
142-
tableNames = new HashMap<>();
143144
datasets = new HashMap<>();
144145

145146
String serviceAccount = sqlEngineConfig.getServiceAccount();
@@ -171,10 +172,13 @@ public void prepareRun(RuntimeContext context) throws Exception {
171172

172173
// Configure credentials for the source
173174
BigQuerySourceUtils.configureServiceAccount(configuration, sqlEngineConfig.connection);
175+
176+
// Get metrics instance
177+
metrics = context.getMetrics();
174178
}
175179

176180
@Override
177-
public void onRunFinish(boolean succeeded, RuntimeContext context) {
181+
public void onRunFinish(boolean succeeded, SQLEngineContext context) {
178182
super.onRunFinish(succeeded, context);
179183

180184
String gcsPath;
@@ -358,7 +362,8 @@ public SQLWriteResult write(SQLWriteRequest writeRequest) {
358362
sqlEngineConfig,
359363
bigQuery,
360364
writeRequest,
361-
sourceTableId);
365+
sourceTableId,
366+
metrics);
362367
return bigQueryWrite.write();
363368
}
364369

@@ -529,7 +534,8 @@ private BigQuerySelectDataset executeSelect(String datasetName,
529534
table,
530535
jobId,
531536
jobType,
532-
query
537+
query,
538+
metrics
533539
).execute();
534540

535541
datasets.put(datasetName, selectDataset);

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.google.cloud.bigquery.TableDefinition;
1212
import com.google.cloud.bigquery.TableId;
1313
import io.cdap.cdap.api.data.schema.Schema;
14+
import io.cdap.cdap.api.metrics.Metrics;
1415
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
1516
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
1617
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
@@ -19,6 +20,8 @@
1920
import org.slf4j.LoggerFactory;
2021

2122
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.Map;
2225
import javax.annotation.Nullable;
2326

2427
/**
@@ -38,6 +41,7 @@ public class BigQuerySelectDataset implements SQLDataset, BigQuerySQLDataset {
3841
private final String jobId;
3942
private final BigQueryJobType operation;
4043
private final String selectQuery;
44+
private final Metrics metrics;
4145
private Long numRows;
4246

4347
public static BigQuerySelectDataset getInstance(String datasetName,
@@ -49,7 +53,8 @@ public static BigQuerySelectDataset getInstance(String datasetName,
4953
String bqTable,
5054
String jobId,
5155
BigQueryJobType jobType,
52-
String selectQuery) {
56+
String selectQuery,
57+
Metrics metrics) {
5358

5459
return new BigQuerySelectDataset(datasetName,
5560
outputSchema,
@@ -60,7 +65,8 @@ public static BigQuerySelectDataset getInstance(String datasetName,
6065
bqTable,
6166
jobId,
6267
jobType,
63-
selectQuery);
68+
selectQuery,
69+
metrics);
6470
}
6571

6672
private BigQuerySelectDataset(String datasetName,
@@ -72,7 +78,8 @@ private BigQuerySelectDataset(String datasetName,
7278
String bqTable,
7379
String jobId,
7480
BigQueryJobType operation,
75-
String selectQuery) {
81+
String selectQuery,
82+
Metrics metrics) {
7683
this.datasetName = datasetName;
7784
this.outputSchema = outputSchema;
7885
this.sqlEngineConfig = sqlEngineConfig;
@@ -83,6 +90,7 @@ private BigQuerySelectDataset(String datasetName,
8390
this.jobId = jobId;
8491
this.operation = operation;
8592
this.selectQuery = selectQuery;
93+
this.metrics = metrics;
8694
}
8795

8896
public BigQuerySelectDataset execute() {
@@ -124,14 +132,14 @@ public BigQuerySelectDataset execute() {
124132
if (queryJob == null) {
125133
throw new SQLEngineException("BigQuery job not found: " + jobId);
126134
} else if (queryJob.getStatus().getError() != null) {
127-
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
135+
BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
128136
throw new SQLEngineException(String.format(
129137
"Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s",
130138
jobId, project, bqDataset, location, queryJob.getStatus().getError().toString()));
131139
}
132140

133141
LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId);
134-
BigQuerySQLEngineUtils.logJobMetrics(queryJob);
142+
BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
135143
return this;
136144
}
137145

0 commit comments

Comments
 (0)