Skip to content

Commit eebfef2

Browse files
committed
chore: refactor return types
1 parent 671519a commit eebfef2

11 files changed

Lines changed: 382 additions & 367 deletions

src/spanner/src/client.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use std::sync::Arc;
2525
/// .execute_query(Statement::builder().sql("select 'Hello World!' as greeting"))
2626
/// .await?;
2727
///
28-
/// while let Some(row) = results.next().await? {
28+
/// while let Some(row) = results.next().await {
29+
/// let row = row?;
2930
/// let greeting: Option<String> = row.get("greeting");
3031
/// println!("Greeting from Spanner: {:?}", greeting);
3132
/// }
@@ -76,7 +77,7 @@ impl Spanner {
7677
&self,
7778
request: crate::model::CreateSessionRequest,
7879
options: crate::RequestOptions,
79-
) -> Result<crate::model::Session, crate::Error> {
80+
) -> crate::Result<crate::model::Session> {
8081
self.inner
8182
.create_session()
8283
.with_request(request)
@@ -89,15 +90,15 @@ impl Spanner {
8990
&self,
9091
request: crate::model::ExecuteSqlRequest,
9192
options: crate::RequestOptions,
92-
) -> Result<crate::model::ResultSet, crate::Error> {
93+
) -> crate::Result<crate::model::ResultSet> {
9394
self.inner.execute_sql().with_request(request).with_options(options).send().await
9495
}
9596

9697
pub async fn execute_batch_dml(
9798
&self,
9899
request: crate::model::ExecuteBatchDmlRequest,
99100
options: crate::RequestOptions,
100-
) -> Result<crate::model::ExecuteBatchDmlResponse, crate::Error> {
101+
) -> crate::Result<crate::model::ExecuteBatchDmlResponse> {
101102
self.inner
102103
.execute_batch_dml()
103104
.with_request(request)
@@ -114,15 +115,15 @@ impl Spanner {
114115
&self,
115116
request: crate::model::ReadRequest,
116117
options: crate::RequestOptions,
117-
) -> Result<crate::model::ResultSet, crate::Error> {
118+
) -> crate::Result<crate::model::ResultSet> {
118119
self.inner.read().with_request(request).with_options(options).send().await
119120
}
120121

121122
pub async fn begin_transaction(
122123
&self,
123124
request: crate::model::BeginTransactionRequest,
124125
options: crate::RequestOptions,
125-
) -> Result<crate::model::Transaction, crate::Error> {
126+
) -> crate::Result<crate::model::Transaction> {
126127
self.inner
127128
.begin_transaction()
128129
.with_request(request)
@@ -135,15 +136,15 @@ impl Spanner {
135136
&self,
136137
request: crate::model::CommitRequest,
137138
options: crate::RequestOptions,
138-
) -> Result<crate::model::CommitResponse, crate::Error> {
139+
) -> crate::Result<crate::model::CommitResponse> {
139140
self.inner.commit().with_request(request).with_options(options).send().await
140141
}
141142

142143
pub async fn rollback(
143144
&self,
144145
request: crate::model::RollbackRequest,
145146
options: crate::RequestOptions,
146-
) -> Result<(), crate::Error> {
147+
) -> crate::Result<()> {
147148
self.inner.rollback().with_request(request).with_options(options).send().await
148149
}
149150

@@ -172,7 +173,7 @@ impl Spanner {
172173
pub async fn database_client(
173174
&self,
174175
database: impl Into<String>,
175-
) -> Result<crate::database_client::DatabaseClient, crate::Error> {
176+
) -> crate::Result<crate::database_client::DatabaseClient> {
176177
let mut request = crate::model::CreateSessionRequest::new();
177178
request.database = database.into();
178179

@@ -431,9 +432,8 @@ mod tests {
431432

432433
let chunk2 = stream
433434
.next_message()
434-
.await
435-
.expect("Stream shouldn't return error");
436-
assert!(chunk2.is_none(), "Stream should be exhausted");
435+
.await;
436+
assert!(chunk2.is_none());
437437
}
438438

439439
#[tokio::test]
@@ -508,8 +508,7 @@ mod tests {
508508

509509
let chunk3 = stream
510510
.next_message()
511-
.await
512-
.expect("Stream shouldn't return error");
511+
.await;
513512
assert!(chunk3.is_none(), "Stream should be exhausted");
514513
}
515514

@@ -618,8 +617,7 @@ mod tests {
618617

619618
let chunk3 = stream
620619
.next_message()
621-
.await
622-
.expect("Stream shouldn't return error");
620+
.await;
623621
assert!(chunk3.is_none(), "Stream should be exhausted");
624622
}
625623

src/spanner/src/client/builder.rs

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -99,112 +99,6 @@ impl ExecuteStreamingSql {
9999

100100
Ok(ServerStream::new(stream_res.into_inner()))
101101
}
102-
103-
// Setters mirroring ExecuteSql request fields
104-
105-
/// Sets the value of session.
106-
pub fn set_session<T: Into<std::string::String>>(mut self, v: T) -> Self {
107-
self.request.session = v.into();
108-
self
109-
}
110-
111-
/// Sets the value of transaction.
112-
pub fn set_transaction<T: Into<crate::model::TransactionSelector>>(mut self, v: T) -> Self {
113-
self.request.transaction = Some(v.into());
114-
self
115-
}
116-
117-
/// Sets the value of sql.
118-
pub fn set_sql<T: Into<std::string::String>>(mut self, v: T) -> Self {
119-
self.request.sql = v.into();
120-
self
121-
}
122-
123-
/// Sets the value of params.
124-
pub fn set_params<T: Into<wkt::Struct>>(mut self, v: T) -> Self {
125-
self.request.params = Some(v.into());
126-
self
127-
}
128-
129-
/// Sets the value of param_types.
130-
pub fn set_param_types<T, K, V>(mut self, v: T) -> Self
131-
where
132-
T: std::iter::IntoIterator<Item = (K, V)>,
133-
K: Into<String>,
134-
V: Into<crate::model::Type>,
135-
{
136-
self.request.param_types = v.into_iter().map(|(k, v)| (k.into(), v.into())).collect();
137-
self
138-
}
139-
140-
/// Sets the value of resume_token.
141-
pub fn set_resume_token<T: Into<::bytes::Bytes>>(mut self, v: T) -> Self {
142-
self.request.resume_token = v.into();
143-
self
144-
}
145-
146-
/// Sets the value of query_mode.
147-
pub fn set_query_mode<T: Into<crate::model::execute_sql_request::QueryMode>>(
148-
mut self,
149-
v: T,
150-
) -> Self {
151-
self.request.query_mode = v.into();
152-
self
153-
}
154-
155-
/// Sets the value of partition_token.
156-
pub fn set_partition_token<T: Into<::bytes::Bytes>>(mut self, v: T) -> Self {
157-
self.request.partition_token = v.into();
158-
self
159-
}
160-
161-
/// Sets the value of seqno.
162-
pub fn set_seqno<T: Into<i64>>(mut self, v: T) -> Self {
163-
self.request.seqno = v.into();
164-
self
165-
}
166-
167-
/// Sets the value of query_options.
168-
pub fn set_query_options<T: Into<crate::model::execute_sql_request::QueryOptions>>(
169-
mut self,
170-
v: T,
171-
) -> Self {
172-
self.request.query_options = Some(v.into());
173-
self
174-
}
175-
176-
/// Sets the value of request_options.
177-
pub fn set_request_options<T: Into<crate::model::RequestOptions>>(mut self, v: T) -> Self {
178-
self.request.request_options = Some(v.into());
179-
self
180-
}
181-
182-
/// Sets the value of directed_read_options.
183-
pub fn set_directed_read_options<T: Into<crate::model::DirectedReadOptions>>(
184-
mut self,
185-
v: T,
186-
) -> Self {
187-
self.request.directed_read_options = Some(v.into());
188-
self
189-
}
190-
191-
/// Sets the value of data_boost_enabled.
192-
pub fn set_data_boost_enabled<T: Into<bool>>(mut self, v: T) -> Self {
193-
self.request.data_boost_enabled = v.into();
194-
self
195-
}
196-
197-
/// Sets the value of last_statement.
198-
pub fn set_last_statement<T: Into<bool>>(mut self, v: T) -> Self {
199-
self.request.last_statement = v.into();
200-
self
201-
}
202-
203-
/// Sets the value of routing_hint.
204-
pub fn set_routing_hint<T: Into<crate::model::RoutingHint>>(mut self, v: T) -> Self {
205-
self.request.routing_hint = Some(v.into());
206-
self
207-
}
208102
}
209103

210104
impl crate::RequestBuilder for ExecuteStreamingSql {

src/spanner/src/client/stream.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use crate::google::spanner::v1::BatchWriteResponse;
1616
use crate::google::spanner::v1::PartialResultSet;
17-
use gaxi::grpc::tonic::Result as TonicResult;
1817
use gaxi::grpc::tonic::Streaming;
1918

2019
/// Representation for the `ExecuteStreamingSql` RPC stream.
@@ -30,10 +29,18 @@ impl ServerStream {
3029

3130
/// Fetches the next `PartialResultSet` from the stream.
3231
///
33-
/// Returns `Ok(Some(PartialResultSet))` when a message is successfully received,
34-
/// `Ok(None)` when the stream concludes naturally, or `Err(_)` on RPC errors.
35-
pub async fn next_message(&mut self) -> TonicResult<Option<PartialResultSet>> {
36-
self.inner.message().await
32+
/// Returns `Some(Ok(PartialResultSet))` when a message is successfully received,
33+
/// `None` when the stream concludes naturally, or `Some(Err(_))` on RPC errors.
34+
pub async fn next_message(&mut self) -> Option<crate::Result<PartialResultSet>> {
35+
self.inner
36+
.message()
37+
.await
38+
.map_err(|e| crate::Error::service(
39+
google_cloud_gax::error::rpc::Status::default()
40+
.set_code(e.code() as i32)
41+
.set_message(e.message())
42+
))
43+
.transpose()
3744
}
3845
}
3946

@@ -50,9 +57,17 @@ impl BatchWriteStream {
5057

5158
/// Fetches the next `BatchWriteResponse` from the stream.
5259
///
53-
/// Returns `Ok(Some(BatchWriteResponse))` when a message is successfully received,
54-
/// `Ok(None)` when the stream concludes naturally, or `Err(_)` on RPC errors.
55-
pub async fn next_message(&mut self) -> TonicResult<Option<BatchWriteResponse>> {
56-
self.inner.message().await
60+
/// Returns `Some(Ok(BatchWriteResponse))` when a message is successfully received,
61+
/// `None` when the stream concludes naturally, or `Some(Err(_))` on RPC errors.
62+
pub async fn next_message(&mut self) -> Option<crate::Result<BatchWriteResponse>> {
63+
self.inner
64+
.message()
65+
.await
66+
.map_err(|e| crate::Error::service(
67+
google_cloud_gax::error::rpc::Status::default()
68+
.set_code(e.code() as i32)
69+
.set_message(e.message())
70+
))
71+
.transpose()
5772
}
5873
}

src/spanner/src/database_client.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ impl DatabaseClient {
5151
/// .execute_query("SELECT 1 AS MyCol")
5252
/// .await?;
5353
///
54-
/// while let Some(row) = rs.next().await? {
54+
/// while let Some(row) = rs.next().await {
55+
/// let row = row?;
5556
/// // Access data by column index
5657
/// let val: i64 = row.get(0);
5758
///
@@ -85,11 +86,11 @@ impl DatabaseClient {
8586
/// let mut rs2 = tx.execute_query("SELECT * FROM MyTable2").await?;
8687
///
8788
/// // Read result sets
88-
/// while let Some(row) = rs1.next().await? {
89-
/// let val: i64 = row.get("MyTable1Col");
89+
/// while let Some(row) = rs1.next().await {
90+
/// let val: i64 = row?.get("MyTable1Col");
9091
/// }
91-
/// while let Some(row) = rs2.next().await? {
92-
/// let val: i64 = row.get("MyTable2Col");
92+
/// while let Some(row) = rs2.next().await {
93+
/// let val: i64 = row?.get("MyTable2Col");
9394
/// }
9495
/// # Ok(()) }
9596
/// ```
@@ -114,11 +115,11 @@ impl DatabaseClient {
114115
/// # .database_client("projects/my-project/instances/my-instance/databases/my-database")
115116
/// # .await?;
116117
/// let (row_count, commit_response) = database_client.read_write_transaction().build().await?.run(|tx| {
117-
/// Box::pin(async move {
118+
/// async move {
118119
/// // 1. Read
119120
/// let mut rs = tx.execute_query("SELECT SingerId FROM Singers WHERE FirstName = 'Alice'").await?;
120-
/// let row = match rs.next().await? {
121-
/// Some(row) => row,
121+
/// let row = match rs.next().await {
122+
/// Some(row) => row?,
122123
/// None => return Ok(0),
123124
/// };
124125
/// let singer_id: i64 = row.get("SingerId");
@@ -130,7 +131,7 @@ impl DatabaseClient {
130131
/// ).await?;
131132
///
132133
/// Ok(rows_updated)
133-
/// })
134+
/// }
134135
/// }).await?;
135136
/// # Ok(()) }
136137
/// ```

src/spanner/src/partitioned_dml.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl PartitionedDmlTransactionBuilder {
2828
self
2929
}
3030

31-
pub async fn build(self) -> Result<PartitionedDmlTransaction, crate::Error> {
31+
pub async fn build(self) -> crate::Result<PartitionedDmlTransaction> {
3232
let mut request = crate::model::BeginTransactionRequest::new();
3333
request.session = self.session.name.clone();
3434
request.options = Some(self.options);
@@ -56,7 +56,7 @@ pub struct PartitionedDmlTransaction {
5656
}
5757

5858
impl PartitionedDmlTransaction {
59-
pub async fn execute(self, statement: impl Into<Statement>) -> Result<i64, crate::Error> {
59+
pub async fn execute(self, statement: impl Into<Statement>) -> crate::Result<i64> {
6060
let statement: Statement = statement.into();
6161
let (mut request, options) = statement.build_request(self.session.name.clone());
6262
request.transaction = Some(self.tx_selector);

0 commit comments

Comments
 (0)