Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod table_info;
use alloc::{rc::Rc, vec::Vec};
pub use common::{ColumnFilter, SchemaTable};
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
pub use raw_table::InferredSchemaCache;
use serde::Deserialize;
use sqlite::ResultCode;
pub use table_info::{
Expand Down
225 changes: 207 additions & 18 deletions crates/core/src/schema/raw_table.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
use core::fmt::{self, Formatter, Write, from_fn};
use core::{
cell::RefCell,
fmt::{self, Formatter, Write, from_fn},
};

use alloc::{
collections::btree_map::BTreeMap,
format,
rc::Rc,
string::{String, ToString},
vec,
vec::Vec,
};
use powersync_sqlite_nostd::{Connection, Destructor, ResultCode};
use powersync_sqlite_nostd::{self as sqlite, Connection, Destructor, ResultCode};

use crate::{
error::PowerSyncError,
schema::{ColumnFilter, RawTable, SchemaTable},
schema::{ColumnFilter, PendingStatement, PendingStatementValue, RawTable, SchemaTable},
utils::{InsertIntoCrud, SqlBuffer, WriteType},
views::table_columns_to_json_object,
};

pub struct InferredTableStructure {
pub name: String,
pub columns: Vec<String>,
}

Expand All @@ -24,7 +30,7 @@ impl InferredTableStructure {
table_name: &str,
db: impl Connection,
synced_columns: &Option<ColumnFilter>,
) -> Result<Option<Self>, PowerSyncError> {
) -> Result<Self, PowerSyncError> {
let stmt = db.prepare_v2("select name from pragma_table_info(?)")?;
stmt.bind_text(1, table_name, Destructor::STATIC)?;

Expand All @@ -45,17 +51,173 @@ impl InferredTableStructure {
}

if !has_id_column && columns.is_empty() {
Ok(None)
Err(PowerSyncError::argument_error(format!(
"Could not find {table_name} in local schema."
)))
} else if !has_id_column {
Err(PowerSyncError::argument_error(format!(
"Table {table_name} has no id column."
)))
} else {
Ok(Some(Self { columns }))
Ok(Self {
name: table_name.to_string(),
columns,
})
}
}

/// Generates a statement of the form `INSERT INTO $tbl ($cols) VALUES (?, ...) ON CONFLICT (id)
/// DO UPDATE SET ...` for the sync client.
pub fn infer_put_stmt(&self) -> PendingStatement {
let mut buffer = SqlBuffer::new();
let mut params = vec![];

buffer.push_str("INSERT INTO ");
let _ = buffer.identifier().write_str(&self.name);
buffer.push_str(" (id");
for column in &self.columns {
buffer.comma();
let _ = buffer.identifier().write_str(column);
}
buffer.push_str(") VALUES (?1");
params.push(PendingStatementValue::Id);
for (i, column) in self.columns.iter().enumerate() {
buffer.comma();
let _ = write!(&mut buffer, "?{}", i + 2);
params.push(PendingStatementValue::Column(column.clone()));
}
buffer.push_str(") ON CONFLICT (id) DO UPDATE SET ");
let mut do_update = buffer.comma_separated();
// Generated an "x" = ? for all synced columns to update them without affecting local-only
// columns.
for (i, column) in self.columns.iter().enumerate() {
let entry = do_update.element();
let _ = entry.identifier().write_str(column);
let _ = write!(entry, " = ?{}", i + 2);
}

PendingStatement {
sql: buffer.sql,
params,
named_parameters_index: None,
}
}

/// Generates a statement of the form `DELETE FROM $tbl WHERE id = ?` for the sync client.
pub fn infer_delete_stmt(&self) -> PendingStatement {
let mut buffer = SqlBuffer::new();
buffer.push_str("DELETE FROM ");
let _ = buffer.identifier().write_str(&self.name);
buffer.push_str(" WHERE id = ?");

PendingStatement {
sql: buffer.sql,
params: vec![PendingStatementValue::Id],
named_parameters_index: None,
}
}
}

/// A cache of inferred raw table schema and associated put and delete statements for `sync_local`.
///
/// This cache avoids having to re-generate statements on every (partial) checkpoint in the sync
/// client.
#[derive(Default)]
pub struct InferredSchemaCache {
entries: RefCell<BTreeMap<String, SchemaCacheEntry>>,
}

impl InferredSchemaCache {
pub fn current_schema_version(db: *mut sqlite::sqlite3) -> Result<usize, PowerSyncError> {
let version = db.prepare_v2("PRAGMA schema_version")?;
version.step()?;
let version = version.column_int64(0) as usize;
Ok(version)
}

pub fn infer_put_statement(
&self,
db: *mut sqlite::sqlite3,
schema_version: usize,
tbl: &RawTable,
) -> Result<Rc<PendingStatement>, PowerSyncError> {
self.with_entry(db, schema_version, tbl, SchemaCacheEntry::put)
}

pub fn infer_delete_statement(
&self,
db: *mut sqlite::sqlite3,
schema_version: usize,
tbl: &RawTable,
) -> Result<Rc<PendingStatement>, PowerSyncError> {
self.with_entry(db, schema_version, tbl, SchemaCacheEntry::delete)
}

fn with_entry(
&self,
db: *mut sqlite::sqlite3,
schema_version: usize,
tbl: &RawTable,
f: impl FnOnce(&mut SchemaCacheEntry) -> Rc<PendingStatement>,
) -> Result<Rc<PendingStatement>, PowerSyncError> {
let mut entries = self.entries.borrow_mut();
if let Some(value) = entries.get_mut(&tbl.name) {
if value.schema_version != schema_version {
// Values are outdated, refresh.
*value = SchemaCacheEntry::infer(db, schema_version, tbl)?;
}

Ok(f(value))
} else {
let mut entry = SchemaCacheEntry::infer(db, schema_version, tbl)?;
let stmt = f(&mut entry);
entries.insert(tbl.name.clone(), entry);
Ok(stmt)
}
}
}

pub struct SchemaCacheEntry {
schema_version: usize,
structure: InferredTableStructure,
put_stmt: Option<Rc<PendingStatement>>,
delete_stmt: Option<Rc<PendingStatement>>,
}

impl SchemaCacheEntry {
fn infer(
db: *mut sqlite::sqlite3,
schema_version: usize,
table: &RawTable,
) -> Result<Self, PowerSyncError> {
let local_table_name = table.require_table_name()?;
let structure = InferredTableStructure::read_from_database(
local_table_name,
db,
&table.schema.synced_columns,
)?;

Ok(Self {
schema_version,
structure,
put_stmt: None,
delete_stmt: None,
})
}

fn put(&mut self) -> Rc<PendingStatement> {
self.put_stmt
.get_or_insert_with(|| Rc::new(self.structure.infer_put_stmt()))
.clone()
}

fn delete(&mut self) -> Rc<PendingStatement> {
self.delete_stmt
.get_or_insert_with(|| Rc::new(self.structure.infer_delete_stmt()))
.clone()
}
}

/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to
/// ps-crud.
pub fn generate_raw_table_trigger(
Expand All @@ -64,19 +226,10 @@ pub fn generate_raw_table_trigger(
trigger_name: &str,
write: WriteType,
) -> Result<String, PowerSyncError> {
let Some(local_table_name) = table.schema.table_name.as_ref() else {
return Err(PowerSyncError::argument_error("Table has no local name"));
};

let local_table_name = table.require_table_name()?;
let synced_columns = &table.schema.synced_columns;
let Some(resolved_table) =
InferredTableStructure::read_from_database(local_table_name, db, synced_columns)?
else {
return Err(PowerSyncError::argument_error(format!(
"Could not find {} in local schema",
local_table_name
)));
};
let resolved_table =
InferredTableStructure::read_from_database(local_table_name, db, synced_columns)?;

let as_schema_table = SchemaTable::Raw {
definition: table,
Expand Down Expand Up @@ -167,3 +320,39 @@ pub fn generate_raw_table_trigger(
buffer.trigger_end();
Ok(buffer.sql)
}

#[cfg(test)]
mod test {
use alloc::{string::ToString, vec};

use crate::schema::{PendingStatementValue, raw_table::InferredTableStructure};

#[test]
fn infer_sync_statements() {
let structure = InferredTableStructure {
name: "tbl".to_string(),
columns: vec!["foo".to_string(), "bar".to_string()],
};

let put = structure.infer_put_stmt();
assert_eq!(
put.sql,
r#"INSERT INTO "tbl" (id, "foo", "bar") VALUES (?1, ?2, ?3) ON CONFLICT (id) DO UPDATE SET "foo" = ?2, "bar" = ?3"#
);
assert_eq!(put.params.len(), 3);
assert!(matches!(put.params[0], PendingStatementValue::Id));
assert!(matches!(
put.params[1],
PendingStatementValue::Column(ref name) if name == "foo"
));
assert!(matches!(
put.params[2],
PendingStatementValue::Column(ref name) if name == "bar"
));

let delete = structure.infer_delete_stmt();
assert_eq!(delete.sql, r#"DELETE FROM "tbl" WHERE id = ?"#);
assert_eq!(delete.params.len(), 1);
assert!(matches!(delete.params[0], PendingStatementValue::Id));
}
}
18 changes: 16 additions & 2 deletions crates/core/src/schema/table_info.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use alloc::rc::Rc;
use alloc::string::ToString;
use alloc::vec;
use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec};
use serde::{Deserialize, de::Visitor};

use crate::error::PowerSyncError;
use crate::schema::ColumnFilter;

#[derive(Deserialize)]
Expand Down Expand Up @@ -52,8 +54,8 @@ pub struct RawTable {
pub name: String,
#[serde(flatten, default)]
pub schema: RawTableSchema,
pub put: PendingStatement,
pub delete: PendingStatement,
pub put: Option<Rc<PendingStatement>>,
pub delete: Option<Rc<PendingStatement>>,
#[serde(default)]
pub clear: Option<String>,
}
Expand All @@ -78,6 +80,18 @@ impl Table {
}
}

impl RawTable {
pub fn require_table_name(&self) -> Result<&str, PowerSyncError> {
let Some(local_table_name) = self.schema.table_name.as_ref() else {
return Err(PowerSyncError::argument_error(format!(
"Raw table {} has no local name",
self.name,
)));
};
Ok(local_table_name)
}
}

#[derive(Deserialize)]
pub struct Column {
pub name: String,
Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use alloc::{
use powersync_sqlite_nostd::{self as sqlite, Context};
use sqlite::{Connection, ResultCode};

use crate::{schema::Schema, sync::SyncClient};
use crate::{
schema::{InferredSchemaCache, Schema},
sync::SyncClient,
};

/// State that is shared for a SQLite database connection after the core extension has been
/// registered on it.
Expand All @@ -25,6 +28,9 @@ pub struct DatabaseState {
pending_updates: RefCell<BTreeSet<String>>,
commited_updates: RefCell<BTreeSet<String>>,
pub sync_client: RefCell<Option<SyncClient>>,
/// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync
/// client.
pub inferred_schema_cache: InferredSchemaCache,
}

impl DatabaseState {
Expand Down
Loading