From 66f7098ed5ace7d759898046fc21d43959a0ae97 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 11:06:47 +0100 Subject: [PATCH 01/10] Update cargo valgrind --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c02ec5f..da368c4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -210,7 +210,7 @@ jobs: - name: Install Cargo Valgrind # TODO: Use released version. Currently we rely on the git version while we wait for this # to be released: https://github.com/jfrimmel/cargo-valgrind/commit/408c0b4fb56e84eddc2bb09c88a11ba3adc0c188 - run: cargo install --git https://github.com/jfrimmel/cargo-valgrind cargo-valgrind + run: cargo install --git https://github.com/jfrimmel/cargo-valgrind #run: cargo install cargo-valgrind - name: Test Core From 2e83a99475090ed687c3cf65e4dcda74d8a82564 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 11:17:29 +0100 Subject: [PATCH 02/10] Fix install script --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index da368c4..c02ec5f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -210,7 +210,7 @@ jobs: - name: Install Cargo Valgrind # TODO: Use released version. Currently we rely on the git version while we wait for this # to be released: https://github.com/jfrimmel/cargo-valgrind/commit/408c0b4fb56e84eddc2bb09c88a11ba3adc0c188 - run: cargo install --git https://github.com/jfrimmel/cargo-valgrind + run: cargo install --git https://github.com/jfrimmel/cargo-valgrind cargo-valgrind #run: cargo install cargo-valgrind - name: Test Core From 4fee3cbfca9911779959f669358c56a8c3dbeacc Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 29 Jan 2026 12:14:41 +0100 Subject: [PATCH 03/10] Start inferring schema dynamically --- crates/core/src/schema/mod.rs | 1 + crates/core/src/schema/raw_table.rs | 46 ++++++++++++++++++++++++++++ crates/core/src/schema/table_info.rs | 23 ++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 crates/core/src/schema/raw_table.rs diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index a8c7c88..fbe1306 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -1,5 +1,6 @@ pub mod inspection; mod management; +mod raw_table; mod table_info; use alloc::{rc::Rc, vec::Vec}; diff --git a/crates/core/src/schema/raw_table.rs b/crates/core/src/schema/raw_table.rs new file mode 100644 index 0000000..5343cfd --- /dev/null +++ b/crates/core/src/schema/raw_table.rs @@ -0,0 +1,46 @@ +use alloc::{ + string::{String, ToString}, + vec, + vec::Vec, +}; +use powersync_sqlite_nostd::{Connection, Destructor, ResultCode}; + +use crate::error::PowerSyncError; + +pub struct InferredTableStructure { + pub table_name: String, + pub has_id_column: bool, + pub columns: Vec, +} + +impl InferredTableStructure { + pub fn read_from_database( + table_name: &str, + db: impl Connection, + ) -> Result, PowerSyncError> { + let stmt = db.prepare_v2("select name from pragma_table_info(?)")?; + stmt.bind_text(1, table_name, Destructor::STATIC)?; + + let mut has_id_column = false; + let mut columns = vec![]; + + while let ResultCode::ROW = stmt.step()? { + let name = stmt.column_text(0)?; + if name == "id" { + has_id_column = true; + } else { + columns.push(name.to_string()); + } + } + + if !has_id_column && columns.is_empty() { + Ok(None) + } else { + Ok(Some(Self { + table_name: table_name.to_string(), + has_id_column, + columns, + })) + } + } +} diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index aec1e4b..f92c15d 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -21,9 +21,32 @@ pub struct Table { pub flags: TableInfoFlags, } +#[derive(Deserialize)] +pub struct RawTableSchema { + /// The actual name of the raw table in the local schema. + /// + /// Currently, this is only used to generate `CREATE TRIGGER` statements for the raw table. + #[serde(default)] + table_name: Option, + #[serde( + default, + rename = "include_old", + deserialize_with = "deserialize_include_old" + )] + pub diff_include_old: Option, + #[serde(flatten)] + pub flags: TableInfoFlags, +} + #[derive(Deserialize)] pub struct RawTable { + /// The [crate::sync::line::OplogEntry::object_type] for which rows should be forwarded to this + /// raw table. + /// + /// This is not necessarily the same as the local name of the raw table. pub name: String, + #[serde(flatten, default)] + pub schema: Option, pub put: PendingStatement, pub delete: PendingStatement, #[serde(default)] From 1a68ed656645e8e3ad5f4b4c34bc8ecd1779fa07 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Feb 2026 10:57:57 +0100 Subject: [PATCH 04/10] Generate triggers for raw tables --- crates/core/src/schema/common.rs | 81 +++++++++++++ crates/core/src/schema/mod.rs | 55 ++++++++- crates/core/src/schema/raw_table.rs | 122 +++++++++++++++++-- crates/core/src/schema/table_info.rs | 55 ++++----- crates/core/src/utils/mod.rs | 2 +- crates/core/src/utils/sql_buffer.rs | 147 +++++++++++++++++++---- crates/core/src/views.rs | 171 ++++++++++----------------- 7 files changed, 461 insertions(+), 172 deletions(-) create mode 100644 crates/core/src/schema/common.rs diff --git a/crates/core/src/schema/common.rs b/crates/core/src/schema/common.rs new file mode 100644 index 0000000..aab38cb --- /dev/null +++ b/crates/core/src/schema/common.rs @@ -0,0 +1,81 @@ +use core::slice; + +use alloc::{string::String, vec::Vec}; + +use crate::schema::{ + Column, CommonTableOptions, RawTable, Table, raw_table::InferredTableStructure, +}; + +pub enum SchemaTable<'a> { + Json(&'a Table), + Raw { + definition: &'a RawTable, + schema: &'a InferredTableStructure, + }, +} + +impl<'a> SchemaTable<'a> { + pub fn common_options(&self) -> &CommonTableOptions { + match self { + Self::Json(table) => &table.options, + Self::Raw { + definition, + schema: _, + } => &definition.schema.options, + } + } + + pub fn column_names(&self) -> impl Iterator { + match self { + Self::Json(table) => SchemaTableColumnIterator::Json(table.columns.iter()), + Self::Raw { + definition: _, + schema, + } => SchemaTableColumnIterator::Raw(schema.columns.iter()), + } + } +} + +impl<'a> From<&'a Table> for SchemaTable<'a> { + fn from(value: &'a Table) -> Self { + Self::Json(value) + } +} + +enum SchemaTableColumnIterator<'a> { + Json(slice::Iter<'a, Column>), + Raw(slice::Iter<'a, String>), +} + +impl<'a> Iterator for SchemaTableColumnIterator<'a> { + type Item = &'a str; + + fn next(&mut self) -> Option { + Some(match self { + Self::Json(iter) => &iter.next()?.name, + Self::Raw(iter) => iter.next()?.as_ref(), + }) + } +} + +pub struct ColumnFilter { + sorted_names: Vec, +} + +impl From> for ColumnFilter { + fn from(mut value: Vec) -> Self { + value.sort(); + Self { + sorted_names: value, + } + } +} + +impl ColumnFilter { + /// Whether this filter matches the given column name. + pub fn matches(&self, column: &str) -> bool { + self.sorted_names + .binary_search_by(|item| item.as_str().cmp(column)) + .is_ok() + } +} diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index fbe1306..8082d06 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -1,18 +1,23 @@ +mod common; pub mod inspection; mod management; mod raw_table; mod table_info; use alloc::{rc::Rc, vec::Vec}; -use powersync_sqlite_nostd as sqlite; +pub use common::{ColumnFilter, SchemaTable}; +use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args}; use serde::Deserialize; use sqlite::ResultCode; pub use table_info::{ - Column, DiffIncludeOld, PendingStatement, PendingStatementValue, RawTable, Table, + Column, CommonTableOptions, PendingStatement, PendingStatementValue, RawTable, Table, TableInfoFlags, }; -use crate::state::DatabaseState; +use crate::{ + error::PowerSyncError, schema::raw_table::generate_raw_table_trigger, state::DatabaseState, + utils::WriteType, +}; #[derive(Deserialize, Default)] pub struct Schema { @@ -22,5 +27,47 @@ pub struct Schema { } pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<(), ResultCode> { - management::register(db, state) + management::register(db, state)?; + + { + fn create_trigger( + context: *mut sqlite::context, + args: &[*mut sqlite::value], + ) -> Result<(), PowerSyncError> { + // Args: Table (JSON), trigger_name, write_type + let table: RawTable = + serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?; + let trigger_name = args[1].text(); + let write_type: WriteType = args[2].text().parse()?; + + let db = context.db_handle(); + let create_trigger_stmt = + generate_raw_table_trigger(db, &table, trigger_name, write_type)?; + db.exec_safe(&create_trigger_stmt)?; + Ok(()) + } + + extern "C" fn create_raw_trigger_sqlite( + context: *mut sqlite::context, + argc: i32, + args: *mut *mut sqlite::value, + ) { + let args = args!(argc, args); + if let Err(e) = create_trigger(context, args) { + e.apply_to_ctx("powersync_create_raw_table_crud_trigger", context); + } + } + + db.create_function_v2( + "powersync_create_raw_table_crud_trigger", + 3, + sqlite::UTF8, + None, + Some(create_raw_trigger_sqlite), + None, + None, + Some(DatabaseState::destroy_rc), + )?; + } + Ok(()) } diff --git a/crates/core/src/schema/raw_table.rs b/crates/core/src/schema/raw_table.rs index 5343cfd..a1e60e4 100644 --- a/crates/core/src/schema/raw_table.rs +++ b/crates/core/src/schema/raw_table.rs @@ -1,15 +1,21 @@ +use core::fmt::from_fn; + use alloc::{ + format, string::{String, ToString}, vec, vec::Vec, }; use powersync_sqlite_nostd::{Connection, Destructor, ResultCode}; -use crate::error::PowerSyncError; +use crate::{ + error::PowerSyncError, + schema::{RawTable, SchemaTable}, + utils::{InsertIntoCrud, SqlBuffer, WriteType}, + views::table_columns_to_json_object, +}; pub struct InferredTableStructure { - pub table_name: String, - pub has_id_column: bool, pub columns: Vec, } @@ -35,12 +41,112 @@ impl InferredTableStructure { if !has_id_column && columns.is_empty() { Ok(None) + } else if !has_id_column { + Err(PowerSyncError::argument_error(format!( + "Table {table_name} has no id column." + ))) + } else { + Ok(Some(Self { columns })) + } + } +} + +/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to +/// ps-crud. +pub fn generate_raw_table_trigger( + db: impl Connection, + table: &RawTable, + trigger_name: &str, + write: WriteType, +) -> Result { + let Some(local_table_name) = table.schema.table_name.as_ref() else { + return Err(PowerSyncError::argument_error("Table has no local name")); + }; + + let Some(resolved_table) = InferredTableStructure::read_from_database(local_table_name, db)? + else { + return Err(PowerSyncError::argument_error(format!( + "Could not find {} in local schema", + local_table_name + ))); + }; + + let as_schema_table = SchemaTable::Raw { + definition: table, + schema: &resolved_table, + }; + + let mut buffer = SqlBuffer::new(); + buffer.create_trigger("", trigger_name); + buffer.trigger_after(write, local_table_name); + // Skip the trigger for writes during sync_local, these aren't crud writes. + buffer.push_str("WHEN NOT powersync_in_sync_operation() BEGIN\n"); + + if table.schema.options.flags.insert_only() { + if write != WriteType::Insert { + // Prevent illegal writes to a table marked as insert-only by raising errors here. + buffer.push_str("SELECT RAISE(FAIL, 'Unexpected update on insert-only table');\n"); } else { - Ok(Some(Self { - table_name: table_name.to_string(), - has_id_column, - columns, - })) + // Write directly to powersync_crud_ to skip writing the $local bucket for insert-only + // tables. + let fragment = table_columns_to_json_object("NEW", &as_schema_table)?; + buffer.powersync_crud_manual_put(&table.name, &fragment); + } + } else { + if write == WriteType::Update { + // Updates must not change the id. + buffer.check_id_not_changed(); } + + let json_fragment_new = table_columns_to_json_object("NEW", &as_schema_table)?; + let json_fragment_old = if write == WriteType::Update { + Some(table_columns_to_json_object("OLD", &as_schema_table)?) + } else { + None + }; + + buffer.insert_into_powersync_crud(InsertIntoCrud { + op: write, + table: &as_schema_table, + id_expr: from_fn(|f| { + if write == WriteType::Delete { + f.write_str("OLD.") + } else { + f.write_str("NEW.") + }?; + f.write_str(".id") + }), + type_name: &table.name, + data: Some(from_fn(|f| { + match write { + WriteType::Insert => {} + WriteType::Update => todo!(), + WriteType::Delete => { + // There is no data for deleted rows, don't emit anything. + } + } + + if write == WriteType::Delete { + // There is no data for deleted rows. + return Ok(()); + } + + write!(f, "json(powersync_diff(")?; + + if let Some(ref old) = json_fragment_old { + f.write_str(old)?; + } else { + // We don't have OLD values for inserts, we diff from an empty JSON object + // instead. + f.write_str("'{}'")?; + }; + + write!(f, ", {json_fragment_new}))") + })), + metadata: None::<&'static str>, + })?; } + + buffer.trigger_end(); + Ok(buffer.sql) } diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index f92c15d..f91b4b6 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -3,6 +3,8 @@ use alloc::vec; use alloc::{collections::btree_set::BTreeSet, format, string::String, vec::Vec}; use serde::{Deserialize, de::Visitor}; +use crate::schema::ColumnFilter; + #[derive(Deserialize)] pub struct Table { pub name: String, @@ -11,6 +13,13 @@ pub struct Table { pub columns: Vec, #[serde(default)] pub indexes: Vec, + #[serde(flatten)] + pub options: CommonTableOptions, +} + +/// Options shared between regular and raw tables. +#[derive(Deserialize, Default)] +pub struct CommonTableOptions { #[serde( default, rename = "include_old", @@ -21,21 +30,15 @@ pub struct Table { pub flags: TableInfoFlags, } -#[derive(Deserialize)] +#[derive(Deserialize, Default)] pub struct RawTableSchema { /// The actual name of the raw table in the local schema. /// /// Currently, this is only used to generate `CREATE TRIGGER` statements for the raw table. #[serde(default)] - table_name: Option, - #[serde( - default, - rename = "include_old", - deserialize_with = "deserialize_include_old" - )] - pub diff_include_old: Option, + pub table_name: Option, #[serde(flatten)] - pub flags: TableInfoFlags, + pub options: CommonTableOptions, } #[derive(Deserialize)] @@ -46,7 +49,7 @@ pub struct RawTable { /// This is not necessarily the same as the local name of the raw table. pub name: String, #[serde(flatten, default)] - pub schema: Option, + pub schema: RawTableSchema, pub put: PendingStatement, pub delete: PendingStatement, #[serde(default)] @@ -61,7 +64,7 @@ impl Table { } pub fn local_only(&self) -> bool { - self.flags.local_only() + self.options.flags.local_only() } pub fn internal_name(&self) -> String { @@ -71,23 +74,6 @@ impl Table { format!("ps_data__{:}", self.name) } } - - pub fn filtered_columns<'a>( - &'a self, - names: impl Iterator, - ) -> impl Iterator { - // First, sort all columns by name for faster lookups by name. - let mut sorted_by_name: Vec<&Column> = self.columns.iter().collect(); - sorted_by_name.sort_by_key(|c| &*c.name); - - names.filter_map(move |name| { - let index = sorted_by_name - .binary_search_by_key(&name, |c| c.name.as_str()) - .ok()?; - - Some(sorted_by_name[index]) - }) - } } #[derive(Deserialize)] @@ -112,10 +98,19 @@ pub struct IndexedColumn { } pub enum DiffIncludeOld { - OnlyForColumns { columns: Vec }, + OnlyForColumns(ColumnFilter), ForAllColumns, } +impl DiffIncludeOld { + pub fn column_filter(&self) -> Option<&ColumnFilter> { + match self { + Self::ForAllColumns => None, + Self::OnlyForColumns(filter) => Some(filter), + } + } +} + fn deserialize_include_old<'de, D: serde::Deserializer<'de>>( deserializer: D, ) -> Result, D::Error> { @@ -162,7 +157,7 @@ fn deserialize_include_old<'de, D: serde::Deserializer<'de>>( elements.push(next); } - Ok(Some(DiffIncludeOld::OnlyForColumns { columns: elements })) + Ok(Some(DiffIncludeOld::OnlyForColumns(elements.into()))) } } diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index cfafe32..06b7c50 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -6,7 +6,7 @@ use alloc::{boxed::Box, string::String}; use powersync_sqlite_nostd::{ColumnType, ManagedStmt}; use serde::Serialize; use serde_json::value::RawValue; -pub use sql_buffer::{InsertIntoCrud, SqlBuffer}; +pub use sql_buffer::{InsertIntoCrud, SqlBuffer, WriteType}; use crate::error::PowerSyncError; use uuid::Uuid; diff --git a/crates/core/src/utils/sql_buffer.rs b/crates/core/src/utils/sql_buffer.rs index 3907a79..c54538f 100644 --- a/crates/core/src/utils/sql_buffer.rs +++ b/crates/core/src/utils/sql_buffer.rs @@ -1,6 +1,13 @@ -use core::fmt::{Display, Write}; +use core::{ + fmt::{Display, Write}, + str::FromStr, +}; -use alloc::string::String; +use alloc::{format, string::String}; + +use crate::{ + error::PowerSyncError, schema::SchemaTable, views::table_columns_to_json_object_with_filter, +}; const DOUBLE_QUOTE: char = '"'; const SINGLE_QUOTE: char = '\''; @@ -69,10 +76,15 @@ impl SqlBuffer { } /// Writes an `INSTEAD OF $write_type ON $on FOR EACH ROW` segment. - pub fn trigger_instead_of(&mut self, write_type: &str, on: &str) { - self.push_str("INSTEAD OF "); - self.push_str(write_type); - self.push_str(" ON "); + pub fn trigger_instead_of(&mut self, write_type: WriteType, on: &str) { + let _ = write!(self, "INSTEAD OF {write_type} ON "); + let _ = self.identifier().write_str(on); + self.push_str(" FOR EACH ROW "); + } + + /// Writes an `INSTEAD OF $write_type ON $on FOR EACH ROW` segment. + pub fn trigger_after(&mut self, write_type: WriteType, on: &str) { + let _ = write!(self, "AFTER {write_type} ON "); let _ = self.identifier().write_str(on); self.push_str(" FOR EACH ROW "); } @@ -96,31 +108,71 @@ impl SqlBuffer { } /// Writes an `INSERT INTO powersync_crud` statement. - pub fn insert_into_powersync_crud( + pub fn insert_into_powersync_crud( &mut self, - insert: InsertIntoCrud, - ) where + insert: InsertIntoCrud, + ) -> Result<(), PowerSyncError> + where Id: Display, Data: Display, - Old: Display, Metadata: Display, { + let old_values = if insert.op == WriteType::Insert { + // Inserts don't have previous values we'd have to track. + None + } else { + let options = insert.table.common_options(); + + match &options.diff_include_old { + None => None, + Some(include_old) => { + let old_values = table_columns_to_json_object_with_filter( + "OLD", + insert.table, + include_old.column_filter(), + )?; + + if options.flags.include_old_only_when_changed() { + // When include_old_only_when_changed is combined with a column filter, make sure we + // only include the powersync_diff of columns matched by the filter. + let filtered_new_fragment = table_columns_to_json_object_with_filter( + "NEW", + insert.table, + include_old.column_filter(), + )?; + + Some(format!( + "json(powersync_diff({filtered_new_fragment}, {old_values}))" + )) + } else { + Some(old_values) + } + } + } + }; + + // Options to ps_crud are only used to conditionally skip empty updates if IGNORE_EMPTY_UPDATE is set. + let options = match insert.op { + WriteType::Update => Some(insert.table.common_options().flags.0), + _ => None, + }; + self.push_str("INSERT INTO powersync_crud(op,id,type"); if insert.data.is_some() { self.push_str(",data"); } - if insert.old_values.is_some() { + if old_values.is_some() { self.push_str(",old_values"); } if insert.metadata.is_some() { self.push_str(",metadata"); } - if insert.options.is_some() { + if options.is_some() { self.push_str(",options"); } self.push_str(") VALUES ("); - let _ = self.string_literal().write_str(insert.op); + let _ = self.string_literal().write_str(insert.op.ps_crud_op_type()); self.comma(); let _ = write!(self, "{}", insert.id_expr); @@ -133,7 +185,7 @@ impl SqlBuffer { let _ = write!(self, "{}", data); } - if let Some(old) = insert.old_values { + if let Some(old) = old_values { self.comma(); let _ = write!(self, "{}", old); } @@ -143,12 +195,24 @@ impl SqlBuffer { let _ = write!(self, "{}", meta); } - if let Some(options) = insert.options { + if let Some(options) = options { self.comma(); let _ = write!(self, "{}", options); } self.push_str(");\n"); + Ok(()) + } + + pub fn powersync_crud_manual_put(&mut self, name: &str, json_fragment: &str) { + self.push_str("INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', "); + let _ = self.string_literal().write_str(name); + + let _ = write!( + self, + ", 'id', NEW.id, 'data', json(powersync_diff('{{}}', {:}))));", + json_fragment, + ); } /// Generates a `CAST(json_extract(, "$.") as )` @@ -242,20 +306,63 @@ impl<'a> CommaSeparated<'a> { } } -pub struct InsertIntoCrud<'a, Id, Data, Old, Metadata> +pub struct InsertIntoCrud<'a, Id, Data, Metadata> where Id: Display, Data: Display, - Old: Display, Metadata: Display, { - pub op: &'a str, + pub op: WriteType, pub id_expr: Id, pub type_name: &'a str, pub data: Option, - pub old_values: Option, + pub table: &'a SchemaTable<'a>, pub metadata: Option, - pub options: Option, +} + +#[derive(Clone, Copy, PartialEq)] +pub enum WriteType { + Insert, + Update, + Delete, +} + +impl WriteType { + pub fn ps_crud_op_type(&self) -> &'static str { + match self { + WriteType::Insert => "PUT", + WriteType::Update => "PATCH", + WriteType::Delete => "DELETE", + } + } +} + +impl Display for WriteType { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_str(match self { + WriteType::Insert => "INSERT", + WriteType::Update => "UPDATE", + WriteType::Delete => "DELETE", + }) + } +} + +impl FromStr for WriteType { + type Err = PowerSyncError; + + fn from_str(s: &str) -> Result { + Ok(match s { + "INSERT" => Self::Insert, + "UPDATE" => Self::Update, + "DELETE" => Self::Delete, + _ => { + return Err(PowerSyncError::argument_error(format!( + "unexpected write type {}", + s + ))); + } + }) + } } #[cfg(test)] diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index 11de20c..a001c52 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -1,20 +1,19 @@ extern crate alloc; -use alloc::borrow::Cow; use alloc::string::String; -use alloc::{format, vec}; +use alloc::vec; use core::fmt::{Write, from_fn}; use core::mem; use crate::error::PowerSyncError; -use crate::schema::{Column, DiffIncludeOld, Table}; -use crate::utils::{InsertIntoCrud, SqlBuffer}; +use crate::schema::{ColumnFilter, SchemaTable, Table}; +use crate::utils::{InsertIntoCrud, SqlBuffer, WriteType}; pub fn powersync_view_sql(table_info: &Table) -> String { let name = &table_info.name; let view_name = &table_info.view_name(); - let local_only = table_info.flags.local_only(); - let include_metadata = table_info.flags.include_metadata(); + let local_only = table_info.options.flags.local_only(); + let include_metadata = table_info.options.flags.include_metadata(); let mut sql = SqlBuffer::new(); sql.push_str("CREATE VIEW "); @@ -60,74 +59,56 @@ pub fn powersync_view_sql(table_info: &Table) -> String { } pub fn powersync_trigger_delete_sql(table_info: &Table) -> Result { - if table_info.flags.insert_only() { + if table_info.options.flags.insert_only() { // Insert-only tables have no DELETE triggers return Ok(String::new()); } let name = &table_info.name; let view_name = table_info.view_name(); - let local_only = table_info.flags.local_only(); + let local_only = table_info.options.flags.local_only(); + let as_schema_table = SchemaTable::from(table_info); let mut sql = SqlBuffer::new(); sql.create_trigger("ps_view_delete_", view_name); - sql.trigger_instead_of("DELETE", view_name); + sql.trigger_instead_of(WriteType::Delete, view_name); sql.push_str("BEGIN\n"); // First, forward to internal data table. sql.push_str("DELETE FROM "); sql.quote_internal_name(name, local_only); sql.push_str(" WHERE id = OLD.id;\n"); - let old_data_value = match &table_info.diff_include_old { - Some(include_old) => { - let json = match include_old { - DiffIncludeOld::OnlyForColumns { columns } => json_object_fragment( - "OLD", - &mut table_info.filtered_columns(columns.iter().map(|c| c.as_str())), - ), - DiffIncludeOld::ForAllColumns => { - json_object_fragment("OLD", &mut table_info.columns.iter()) - } - }?; - - Some(json) - } - None => None, - }; - if !local_only { // We also need to record the write in powersync_crud. sql.insert_into_powersync_crud(InsertIntoCrud { - op: "DELETE", + op: WriteType::Delete, + table: &as_schema_table, id_expr: "OLD.id", type_name: name, data: None::<&'static str>, - old_values: old_data_value.as_ref(), metadata: None::<&'static str>, - options: None, - }); + })?; - if table_info.flags.include_metadata() { + if table_info.options.flags.include_metadata() { // The DELETE statement can't include metadata for the delete operation, so we create // another trigger to delete with a fake UPDATE syntax. sql.trigger_end(); sql.push_str(";\n"); sql.create_trigger("ps_view_delete2_", view_name); - sql.trigger_instead_of("UPDATE", view_name); + sql.trigger_instead_of(WriteType::Update, view_name); sql.push_str("WHEN NEW._deleted IS TRUE BEGIN DELETE FROM "); sql.quote_internal_name(name, local_only); sql.push_str(" WHERE id = OLD.id; "); sql.insert_into_powersync_crud(InsertIntoCrud { - op: "DELETE", + op: WriteType::Delete, + table: &as_schema_table, id_expr: "OLD.id", type_name: name, data: None::<&'static str>, - old_values: old_data_value.as_ref(), metadata: Some("NEW._metadata"), - options: None, - }); + })?; } } @@ -138,31 +119,25 @@ pub fn powersync_trigger_delete_sql(table_info: &Table) -> Result Result { let name = &table_info.name; let view_name = table_info.view_name(); - let local_only = table_info.flags.local_only(); - let insert_only = table_info.flags.insert_only(); + let local_only = table_info.options.flags.local_only(); + let insert_only = table_info.options.flags.insert_only(); + let as_schema_table = SchemaTable::from(table_info); let mut sql = SqlBuffer::new(); sql.create_trigger("ps_view_insert_", view_name); - sql.trigger_instead_of("INSERT", view_name); + sql.trigger_instead_of(WriteType::Insert, view_name); sql.push_str("BEGIN\n"); if !local_only { sql.check_id_valid(); } - let json_fragment = json_object_fragment("NEW", &mut table_info.columns.iter())?; + let json_fragment = table_columns_to_json_object("NEW", &as_schema_table)?; if insert_only { // This is using the manual powersync_crud_ instead of powersync_crud because insert-only // writes shouldn't prevent us from receiving new data. - sql.push_str("INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', "); - let _ = sql.string_literal().write_str(name); - - let _ = write!( - &mut sql, - ", 'id', NEW.id, 'data', json(powersync_diff('{{}}', {:}))));", - json_fragment, - ); + sql.powersync_crud_manual_put(name, &json_fragment); } else { // Insert into the underlying data table. sql.push_str("INSERT INTO "); @@ -172,20 +147,19 @@ pub fn powersync_trigger_insert_sql(table_info: &Table) -> Result, - metadata: if table_info.flags.include_metadata() { + metadata: if table_info.options.flags.include_metadata() { Some("NEW._metadata") } else { None }, - options: None, - }); + })?; } } @@ -194,29 +168,30 @@ pub fn powersync_trigger_insert_sql(table_info: &Table) -> Result Result { - if table_info.flags.insert_only() { + if table_info.options.flags.insert_only() { // Insert-only tables have no UPDATE triggers return Ok(String::new()); } let name = &table_info.name; let view_name = table_info.view_name(); - let local_only = table_info.flags.local_only(); + let local_only = table_info.options.flags.local_only(); + let as_schema_table = SchemaTable::from(table_info); let mut sql = SqlBuffer::new(); sql.create_trigger("ps_view_update_", view_name); - sql.trigger_instead_of("UPDATE", view_name); + sql.trigger_instead_of(WriteType::Update, view_name); // If we're supposed to include metadata, we support UPDATE ... SET _deleted = TRUE with // another trigger (because there's no way to attach data to DELETE statements otherwise). - if table_info.flags.include_metadata() { + if table_info.options.flags.include_metadata() { sql.push_str(" WHEN NEW._deleted IS NOT TRUE "); } sql.push_str("BEGIN\n"); sql.check_id_not_changed(); - let json_fragment_new = json_object_fragment("NEW", &mut table_info.columns.iter())?; - let json_fragment_old = json_object_fragment("OLD", &mut table_info.columns.iter())?; + let json_fragment_new = table_columns_to_json_object("NEW", &as_schema_table)?; + let json_fragment_old = table_columns_to_json_object("OLD", &as_schema_table)?; // UPDATE {internal_name} SET data = {json_fragment_new} WHERE id = NEW.id; sql.push_str("UPDATE "); @@ -227,43 +202,11 @@ pub fn powersync_trigger_update_sql(table_info: &Table) -> Result None, - Some(DiffIncludeOld::ForAllColumns) => Some(json_fragment_old.clone()), - Some(DiffIncludeOld::OnlyForColumns { columns }) => Some(json_object_fragment( - "OLD", - &mut table_info.filtered_columns(columns.iter().map(|c| c.as_str())), - )?), - }; - - if table_info.flags.include_old_only_when_changed() { - old_values_fragment = match old_values_fragment { - None => None, - Some(f) => { - let filtered_new_fragment = match &table_info.diff_include_old { - // When include_old_only_when_changed is combined with a column filter, make sure we - // only include the powersync_diff of columns matched by the filter. - Some(DiffIncludeOld::OnlyForColumns { columns }) => { - Cow::Owned(json_object_fragment( - "NEW", - &mut table_info - .filtered_columns(columns.iter().map(|c| c.as_str())), - )?) - } - _ => Cow::Borrowed(json_fragment_new.as_str()), - }; - - Some(format!( - "json(powersync_diff({filtered_new_fragment}, {f}))" - )) - } - } - } - // Also forward write to powersync_crud vtab. sql.insert_into_powersync_crud(InsertIntoCrud { - op: "PATCH", + op: WriteType::Update, id_expr: "NEW.id", + table: &as_schema_table, type_name: name, data: Some(from_fn(|f| { write!( @@ -271,14 +214,12 @@ pub fn powersync_trigger_update_sql(table_info: &Table) -> Result Result( +pub fn table_columns_to_json_object<'a>( + prefix: &str, + table: &'a SchemaTable<'a>, +) -> Result { + table_columns_to_json_object_with_filter(prefix, table, None) +} + +pub fn table_columns_to_json_object_with_filter<'a>( prefix: &str, - columns: &mut dyn Iterator, + table: &'a SchemaTable<'a>, + filter: Option<&'a ColumnFilter>, ) -> Result { // floor(SQLITE_MAX_FUNCTION_ARG / 2). // To keep databases portable, we use the default limit of 100 args for this, @@ -313,7 +262,14 @@ fn json_object_fragment<'a>( buffer.sql } - while let Some(column) = columns.next() { + let mut columns = table.column_names(); + while let Some(name) = columns.next() { + if let Some(filter) = filter + && !filter.matches(name) + { + continue; + } + total_columns += 1; // SQLITE_MAX_COLUMN - 1 (because of the id column) if total_columns > 1999 { @@ -322,8 +278,6 @@ fn json_object_fragment<'a>( )); } - let name = &*column.name; - let pending_object = pending_json_object.get_or_insert_with(new_pending_object); if pending_object.0 == MAX_ARG_COUNT { // We already have 50 key-value pairs in this call, finish. @@ -382,10 +336,10 @@ mod test { use alloc::{string::ToString, vec}; use crate::{ - schema::{Column, Table, TableInfoFlags}, + schema::{Column, Table}, views::{ - json_object_fragment, powersync_trigger_delete_sql, powersync_trigger_insert_sql, - powersync_trigger_update_sql, powersync_view_sql, + powersync_trigger_delete_sql, powersync_trigger_insert_sql, + powersync_trigger_update_sql, powersync_view_sql, table_columns_to_json_object, }, }; @@ -404,15 +358,14 @@ mod test { }, ], indexes: vec![], - diff_include_old: None, - flags: TableInfoFlags::default(), + options: Default::default(), }; } #[test] fn test_json_object_fragment() { let fragment = - json_object_fragment("NEW", &mut test_table().columns.iter()).expect("should generate"); + table_columns_to_json_object("NEW", &(&test_table()).into()).expect("should generate"); assert_eq!( fragment, @@ -446,7 +399,7 @@ END"# #[test] fn local_only_does_not_write_into_ps_crud() { let mut table = test_table(); - table.flags.0 = 1; // local-only bit + table.options.flags.0 = 1; // local-only bit assert!( !powersync_trigger_insert_sql(&table) From 672264c23c648ab32b83e46e7b68de4e884d4cac Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Feb 2026 11:05:16 +0100 Subject: [PATCH 05/10] Remove generics on InsertIntoCrud --- crates/core/src/schema/mod.rs | 2 +- crates/core/src/schema/raw_table.rs | 15 ++++++--------- crates/core/src/utils/sql_buffer.rs | 24 +++++++----------------- crates/core/src/views.rs | 8 ++++---- 4 files changed, 18 insertions(+), 31 deletions(-) diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index 8082d06..ca8e816 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -66,7 +66,7 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() Some(create_raw_trigger_sqlite), None, None, - Some(DatabaseState::destroy_rc), + None, )?; } Ok(()) diff --git a/crates/core/src/schema/raw_table.rs b/crates/core/src/schema/raw_table.rs index a1e60e4..d0fb1d9 100644 --- a/crates/core/src/schema/raw_table.rs +++ b/crates/core/src/schema/raw_table.rs @@ -108,16 +108,13 @@ pub fn generate_raw_table_trigger( buffer.insert_into_powersync_crud(InsertIntoCrud { op: write, table: &as_schema_table, - id_expr: from_fn(|f| { - if write == WriteType::Delete { - f.write_str("OLD.") - } else { - f.write_str("NEW.") - }?; - f.write_str(".id") - }), + id_expr: if write == WriteType::Delete { + "OLD.id" + } else { + "NEW.id" + }, type_name: &table.name, - data: Some(from_fn(|f| { + data: Some(&from_fn(|f| { match write { WriteType::Insert => {} WriteType::Update => todo!(), diff --git a/crates/core/src/utils/sql_buffer.rs b/crates/core/src/utils/sql_buffer.rs index c54538f..4275df3 100644 --- a/crates/core/src/utils/sql_buffer.rs +++ b/crates/core/src/utils/sql_buffer.rs @@ -108,15 +108,10 @@ impl SqlBuffer { } /// Writes an `INSERT INTO powersync_crud` statement. - pub fn insert_into_powersync_crud( + pub fn insert_into_powersync_crud( &mut self, - insert: InsertIntoCrud, - ) -> Result<(), PowerSyncError> - where - Id: Display, - Data: Display, - Metadata: Display, - { + insert: InsertIntoCrud, + ) -> Result<(), PowerSyncError> { let old_values = if insert.op == WriteType::Insert { // Inserts don't have previous values we'd have to track. None @@ -306,18 +301,13 @@ impl<'a> CommaSeparated<'a> { } } -pub struct InsertIntoCrud<'a, Id, Data, Metadata> -where - Id: Display, - Data: Display, - Metadata: Display, -{ +pub struct InsertIntoCrud<'a> { pub op: WriteType, - pub id_expr: Id, + pub id_expr: &'a str, pub type_name: &'a str, - pub data: Option, + pub data: Option<&'a dyn Display>, pub table: &'a SchemaTable<'a>, - pub metadata: Option, + pub metadata: Option<&'a str>, } #[derive(Clone, Copy, PartialEq)] diff --git a/crates/core/src/views.rs b/crates/core/src/views.rs index a001c52..d87cdd0 100644 --- a/crates/core/src/views.rs +++ b/crates/core/src/views.rs @@ -85,7 +85,7 @@ pub fn powersync_trigger_delete_sql(table_info: &Table) -> Result, + data: None, metadata: None::<&'static str>, })?; @@ -106,7 +106,7 @@ pub fn powersync_trigger_delete_sql(table_info: &Table) -> Result, + data: None, metadata: Some("NEW._metadata"), })?; } @@ -151,7 +151,7 @@ pub fn powersync_trigger_insert_sql(table_info: &Table) -> Result Result Date: Wed, 11 Feb 2026 12:05:02 +0100 Subject: [PATCH 06/10] Add tests for created triggers --- crates/core/src/schema/common.rs | 3 + crates/core/src/schema/mod.rs | 6 +- crates/core/src/schema/raw_table.rs | 47 ++++---- crates/core/src/utils/sql_buffer.rs | 4 +- dart/test/crud_test.dart | 172 ++++++++++++++++++++++++++++ 5 files changed, 202 insertions(+), 30 deletions(-) diff --git a/crates/core/src/schema/common.rs b/crates/core/src/schema/common.rs index aab38cb..cd2a0e5 100644 --- a/crates/core/src/schema/common.rs +++ b/crates/core/src/schema/common.rs @@ -6,6 +6,8 @@ use crate::schema::{ Column, CommonTableOptions, RawTable, Table, raw_table::InferredTableStructure, }; +/// Utility to wrap both PowerSync-managed JSON tables and raw tables (with their schema snapshot +/// inferred from reading `pragma_table_info`) into a common implementation. pub enum SchemaTable<'a> { Json(&'a Table), Raw { @@ -25,6 +27,7 @@ impl<'a> SchemaTable<'a> { } } + /// Iterates over defined column names in this table (not including the `id` column). pub fn column_names(&self) -> impl Iterator { match self { Self::Json(table) => SchemaTableColumnIterator::Json(table.columns.iter()), diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index ca8e816..1aa3c6e 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -15,7 +15,9 @@ pub use table_info::{ }; use crate::{ - error::PowerSyncError, schema::raw_table::generate_raw_table_trigger, state::DatabaseState, + error::{PSResult, PowerSyncError}, + schema::raw_table::generate_raw_table_trigger, + state::DatabaseState, utils::WriteType, }; @@ -43,7 +45,7 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() let db = context.db_handle(); let create_trigger_stmt = generate_raw_table_trigger(db, &table, trigger_name, write_type)?; - db.exec_safe(&create_trigger_stmt)?; + db.exec_safe(&create_trigger_stmt).into_db_result(db)?; Ok(()) } diff --git a/crates/core/src/schema/raw_table.rs b/crates/core/src/schema/raw_table.rs index d0fb1d9..f7be256 100644 --- a/crates/core/src/schema/raw_table.rs +++ b/crates/core/src/schema/raw_table.rs @@ -1,4 +1,4 @@ -use core::fmt::from_fn; +use core::fmt::{self, Formatter, from_fn}; use alloc::{ format, @@ -105,6 +105,20 @@ pub fn generate_raw_table_trigger( None }; + let write_data = from_fn(|f: &mut Formatter| -> fmt::Result { + write!(f, "json(powersync_diff(")?; + + if let Some(ref old) = json_fragment_old { + f.write_str(old)?; + } else { + // We don't have OLD values for inserts, we diff from an empty JSON object + // instead. + f.write_str("'{}'")?; + }; + + write!(f, ", {json_fragment_new}))") + }); + buffer.insert_into_powersync_crud(InsertIntoCrud { op: write, table: &as_schema_table, @@ -114,32 +128,11 @@ pub fn generate_raw_table_trigger( "NEW.id" }, type_name: &table.name, - data: Some(&from_fn(|f| { - match write { - WriteType::Insert => {} - WriteType::Update => todo!(), - WriteType::Delete => { - // There is no data for deleted rows, don't emit anything. - } - } - - if write == WriteType::Delete { - // There is no data for deleted rows. - return Ok(()); - } - - write!(f, "json(powersync_diff(")?; - - if let Some(ref old) = json_fragment_old { - f.write_str(old)?; - } else { - // We don't have OLD values for inserts, we diff from an empty JSON object - // instead. - f.write_str("'{}'")?; - }; - - write!(f, ", {json_fragment_new}))") - })), + data: match write { + // There is no data for deleted rows. + WriteType::Delete => None, + _ => Some(&write_data), + }, metadata: None::<&'static str>, })?; } diff --git a/crates/core/src/utils/sql_buffer.rs b/crates/core/src/utils/sql_buffer.rs index 4275df3..2e8d2ab 100644 --- a/crates/core/src/utils/sql_buffer.rs +++ b/crates/core/src/utils/sql_buffer.rs @@ -127,7 +127,9 @@ impl SqlBuffer { include_old.column_filter(), )?; - if options.flags.include_old_only_when_changed() { + if insert.op == WriteType::Update + && options.flags.include_old_only_when_changed() + { // When include_old_only_when_changed is combined with a column filter, make sure we // only include the powersync_diff of columns matched by the filter. let filtered_new_fragment = table_columns_to_json_object_with_filter( diff --git a/dart/test/crud_test.dart b/dart/test/crud_test.dart index 834301f..2f19a9b 100644 --- a/dart/test/crud_test.dart +++ b/dart/test/crud_test.dart @@ -4,6 +4,7 @@ import 'package:sqlite3/common.dart'; import 'package:test/test.dart'; import 'utils/native_test_utils.dart'; +import 'utils/test_utils.dart'; void main() { group('crud tests', () { @@ -774,5 +775,176 @@ void main() { expect(db.select('SELECT * FROM ps_crud'), isEmpty); }); }); + + group('raw tables', () { + void createRawTableTriggers(Object table, + {bool insert = true, bool update = true, bool delete = true}) { + db.execute('SELECT powersync_init()'); + + if (insert) { + db.execute('SELECT powersync_create_raw_table_crud_trigger(?, ?, ?)', + [json.encode(table), 'test_trigger_insert', 'INSERT']); + } + if (update) { + db.execute('SELECT powersync_create_raw_table_crud_trigger(?, ?, ?)', + [json.encode(table), 'test_trigger_update', 'UPDATE']); + } + if (delete) { + db.execute('SELECT powersync_create_raw_table_crud_trigger(?, ?, ?)', + [json.encode(table), 'test_trigger_delete', 'DELETE']); + } + } + + Object rawTableDescription(Map options) { + return { + 'name': 'row_type', + 'put': {'sql': '', 'params': []}, + 'delete': {'sql': '', 'params': []}, + ...options, + }; + } + + test('missing id column', () { + db.execute('CREATE TABLE users (name TEXT);'); + expect( + () => createRawTableTriggers( + rawTableDescription({'table_name': 'users'})), + throwsA(isSqliteException( + 3091, contains('Table users has no id column'))), + ); + }); + + test('missing local table name', () { + db.execute('CREATE TABLE users (name TEXT);'); + expect( + () => createRawTableTriggers(rawTableDescription({})), + throwsA(isSqliteException(3091, contains('Table has no local name'))), + ); + }); + + test('missing local table', () { + expect( + () => createRawTableTriggers( + rawTableDescription({'table_name': 'users'})), + throwsA(isSqliteException( + 3091, contains('Could not find users in local schema'))), + ); + }); + + test('default options', () { + db.execute('CREATE TABLE users (id TEXT, name TEXT) STRICT;'); + createRawTableTriggers(rawTableDescription({'table_name': 'users'})); + + db + ..execute( + 'INSERT INTO users (id, name) VALUES (?, ?)', ['id', 'name']) + ..execute('UPDATE users SET name = ?', ['new name']) + ..execute('DELETE FROM users WHERE id = ?', ['id']); + + final psCrud = db.select('SELECT * FROM ps_crud'); + expect(psCrud, [ + { + 'id': 1, + 'tx_id': 1, + 'data': json.encode({ + 'op': 'PUT', + 'id': 'id', + 'type': 'row_type', + 'data': {'name': 'name'} + }), + }, + { + 'id': 2, + 'tx_id': 2, + 'data': json.encode({ + 'op': 'PATCH', + 'id': 'id', + 'type': 'row_type', + 'data': {'name': 'new name'} + }), + }, + { + 'id': 3, + 'tx_id': 3, + 'data': + json.encode({'op': 'DELETE', 'id': 'id', 'type': 'row_type'}), + }, + ]); + }); + + test('insert only', () { + db.execute('CREATE TABLE users (id TEXT, name TEXT) STRICT;'); + createRawTableTriggers( + rawTableDescription({'table_name': 'users', 'insert_only': true})); + + db.execute( + 'INSERT INTO users (id, name) VALUES (?, ?)', ['id', 'name']); + expect(db.select('SELECT * FROM ps_crud'), hasLength(1)); + + // Should not update the $local bucket + expect(db.select('SELECT * FROM ps_buckets'), hasLength(0)); + + // The trigger should prevent other writes. + expect( + () => db.execute('UPDATE users SET name = ?', ['new name']), + throwsA(isSqliteException( + 1811, contains('Unexpected update on insert-only table')))); + expect( + () => db.execute('DELETE FROM users WHERE id = ?', ['id']), + throwsA(isSqliteException( + 1811, contains('Unexpected update on insert-only table')))); + }); + + test('tracking old values', () { + db.execute( + 'CREATE TABLE users (id TEXT, name TEXT, email TEXT) STRICT;'); + createRawTableTriggers(rawTableDescription({ + 'table_name': 'users', + 'include_old': ['name'], + 'include_old_only_when_changed': true, + })); + + db + ..execute('INSERT INTO users (id, name, email) VALUES (?, ?, ?)', + ['id', 'name', 'test@example.org']) + ..execute('UPDATE users SET name = ?, email = ?', + ['new name', 'newmail@example.org']) + ..execute('DELETE FROM users WHERE id = ?', ['id']); + + final psCrud = db.select( + r"SELECT id, data->>'$.op' AS op, data->>'$.old' as old FROM ps_crud"); + expect(psCrud, [ + { + 'id': 1, + 'op': 'PUT', + 'old': null, + }, + { + 'id': 2, + 'op': 'PATCH', + 'old': json.encode({'name': 'name'}), + }, + { + 'id': 3, + 'op': 'DELETE', + 'old': json.encode({'name': 'new name'}), + }, + ]); + }); + + test('skipping empty updates', () { + db.execute('CREATE TABLE users (id TEXT, name TEXT) STRICT;'); + createRawTableTriggers(rawTableDescription( + {'table_name': 'users', 'ignore_empty_update': true})); + + db.execute( + 'INSERT INTO users (id, name) VALUES (?, ?)', ['id', 'name']); + expect(db.select('SELECT * FROM ps_crud'), hasLength(1)); + + // Empty update should not be recorded + db.execute('UPDATE users SET name = ?', ['name']); + expect(db.select('SELECT * FROM ps_crud'), hasLength(1)); + }); + }); }); } From 8526649b2067b8cb653647b70e78ba8ab07b39f5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Feb 2026 12:19:11 +0100 Subject: [PATCH 07/10] Also test trigger in sync client --- dart/test/sync_test.dart | 70 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index bf412ab..5be01fd 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1170,6 +1170,76 @@ CREATE TRIGGER users_delete expect(db.select('SELECT * FROM users'), isEmpty); }); + syncTest('default trigger smoke test', (_) { + db.execute( + 'CREATE TABLE local_users (id TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL) STRICT;'); + final table = { + 'name': 'users', + 'table_name': 'local_users', + // This also tests that the trigger preventing updates and deletes on + // insert-only tables is inert during sync_local. + 'insert_only': true, + 'put': { + 'sql': 'INSERT OR REPLACE INTO local_users (id, name) VALUES (?, ?);', + 'params': [ + 'Id', + {'Column': 'name'} + ], + }, + 'delete': { + 'sql': 'DELETE FROM local_users WHERE id = ?', + 'params': ['Id'], + }, + 'clear': 'DELETE FROM local_users;', + }; + db.execute(''' +SELECT + powersync_create_raw_table_crud_trigger(?1, 'test_insert', 'INSERT'), + powersync_create_raw_table_crud_trigger(?1, 'test_update', 'UPDATE'), + powersync_create_raw_table_crud_trigger(?1, 'test_delete', 'DELETE') +''', [json.encode(table)]); + + invokeControl( + 'start', + json.encode({ + 'schema': { + 'raw_tables': [table], + 'tables': [], + } + })); + + // Insert + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'my_user', + 'PUT', + {'name': 'First user'}, + objectType: 'users', + ); + pushCheckpointComplete(); + + final users = db.select('SELECT * FROM local_users;'); + expect(users, [ + {'id': 'my_user', 'name': 'First user'} + ]); + + // Delete + pushCheckpoint(buckets: [bucketDescription('a')]); + pushSyncData( + 'a', + '1', + 'my_user', + 'REMOVE', + null, + objectType: 'users', + ); + pushCheckpointComplete(); + + expect(db.select('SELECT * FROM local_users'), isEmpty); + }); + test('reports errors from underlying statements', () { setupRawTables(); invokeControl('start', json.encode({'schema': schema})); From cea56a8d1f45fe92077614a23a740b65870e9a75 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Feb 2026 12:33:33 +0100 Subject: [PATCH 08/10] Fix typo in documentation comment --- crates/core/src/utils/sql_buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/utils/sql_buffer.rs b/crates/core/src/utils/sql_buffer.rs index 2e8d2ab..6a9c97d 100644 --- a/crates/core/src/utils/sql_buffer.rs +++ b/crates/core/src/utils/sql_buffer.rs @@ -82,7 +82,7 @@ impl SqlBuffer { self.push_str(" FOR EACH ROW "); } - /// Writes an `INSTEAD OF $write_type ON $on FOR EACH ROW` segment. + /// Writes an `AFTER $write_type ON $on FOR EACH ROW` segment. pub fn trigger_after(&mut self, write_type: WriteType, on: &str) { let _ = write!(self, "AFTER {write_type} ON "); let _ = self.identifier().write_str(on); From 58d22effa36a0547e121d810173121bc7d64d344 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Feb 2026 14:06:16 +0100 Subject: [PATCH 09/10] Add a few golden tests --- dart/test/schema_test.dart | 121 +++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/dart/test/schema_test.dart b/dart/test/schema_test.dart index 357468f..772e42a 100644 --- a/dart/test/schema_test.dart +++ b/dart/test/schema_test.dart @@ -218,9 +218,130 @@ void main() { isEmpty, ); }); + + group('triggers for raw tables', () { + const createUsers = + 'CREATE TABLE users (id TEXT, email TEXT, email_verified INTEGER);'; + + const testCases = <_RawTableTestCase>[ + // Default options + _RawTableTestCase( + createTable: createUsers, + tableOptions: {'table_name': 'users'}, + insert: ''' +CREATE TRIGGER "test_insert" AFTER INSERT ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud(op,id,type,data) VALUES ('PUT', NEW.id, 'sync_type', json(powersync_diff('{}', json_object('email', powersync_strip_subtype(NEW."email"), 'email_verified', powersync_strip_subtype(NEW."email_verified"))))); +END''', + update: ''' +CREATE TRIGGER "test_update" AFTER UPDATE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +SELECT CASE WHEN (OLD.id != NEW.id) THEN RAISE (FAIL, 'Cannot update id') END; +INSERT INTO powersync_crud(op,id,type,data,options) VALUES ('PATCH', NEW.id, 'sync_type', json(powersync_diff(json_object('email', powersync_strip_subtype(OLD."email"), 'email_verified', powersync_strip_subtype(OLD."email_verified")), json_object('email', powersync_strip_subtype(NEW."email"), 'email_verified', powersync_strip_subtype(NEW."email_verified")))), 0); +END''', + delete: ''' +CREATE TRIGGER "test_delete" AFTER DELETE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud(op,id,type) VALUES ('DELETE', OLD.id, 'sync_type'); +END''', + ), + // Insert-only + _RawTableTestCase( + createTable: createUsers, + tableOptions: { + 'table_name': 'users', + 'insert_only': true, + }, + insert: ''' +CREATE TRIGGER "test_insert" AFTER INSERT ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud_(data) VALUES(json_object('op', 'PUT', 'type', 'sync_type', 'id', NEW.id, 'data', json(powersync_diff('{}', json_object('email', powersync_strip_subtype(NEW."email"), 'email_verified', powersync_strip_subtype(NEW."email_verified"))))));END''', + update: ''' +CREATE TRIGGER "test_update" AFTER UPDATE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +SELECT RAISE(FAIL, 'Unexpected update on insert-only table'); +END''', + delete: ''' +CREATE TRIGGER "test_delete" AFTER DELETE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +SELECT RAISE(FAIL, 'Unexpected update on insert-only table'); +END''', + ), + // Tracking old values + _RawTableTestCase( + createTable: createUsers, + tableOptions: { + 'table_name': 'users', + 'include_old': true, + }, + insert: ''' +CREATE TRIGGER "test_insert" AFTER INSERT ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud(op,id,type,data) VALUES ('PUT', NEW.id, 'sync_type', json(powersync_diff('{}', json_object('email', powersync_strip_subtype(NEW."email"), 'email_verified', powersync_strip_subtype(NEW."email_verified"))))); +END''', + update: ''' +CREATE TRIGGER "test_update" AFTER UPDATE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +SELECT CASE WHEN (OLD.id != NEW.id) THEN RAISE (FAIL, 'Cannot update id') END; +INSERT INTO powersync_crud(op,id,type,data,old_values,options) VALUES ('PATCH', NEW.id, 'sync_type', json(powersync_diff(json_object('email', powersync_strip_subtype(OLD."email"), 'email_verified', powersync_strip_subtype(OLD."email_verified")), json_object('email', powersync_strip_subtype(NEW."email"), 'email_verified', powersync_strip_subtype(NEW."email_verified")))), json_object('email', powersync_strip_subtype(OLD."email"), 'email_verified', powersync_strip_subtype(OLD."email_verified")), 0); +END''', + delete: ''' +CREATE TRIGGER "test_delete" AFTER DELETE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud(op,id,type,old_values) VALUES ('DELETE', OLD.id, 'sync_type', json_object('email', powersync_strip_subtype(OLD."email"), 'email_verified', powersync_strip_subtype(OLD."email_verified"))); +END''', + ), + ]; + + for (final (i, testCase) in testCases.indexed) { + test('#$i', () => testCase.testWith(db)); + } + }); }); } +final class _RawTableTestCase { + final String createTable; + final Map tableOptions; + final String insert, update, delete; + + const _RawTableTestCase({ + required this.createTable, + required this.tableOptions, + required this.insert, + required this.update, + required this.delete, + }); + + void testWith(CommonDatabase db) { + db.execute(createTable); + db.execute(''' +SELECT + powersync_create_raw_table_crud_trigger(?1, 'test_insert', 'INSERT'), + powersync_create_raw_table_crud_trigger(?1, 'test_update', 'UPDATE'), + powersync_create_raw_table_crud_trigger(?1, 'test_delete', 'DELETE') +''', [ + json.encode({ + 'name': 'sync_type', + 'put': { + 'sql': 'unused', + 'params': [], + }, + 'delete': { + 'sql': 'unused', + 'params': [], + }, + ...tableOptions, + }) + ]); + + final foundTriggers = + db.select("SELECT name, sql FROM sqlite_schema WHERE type = 'trigger'"); + + // Uncomment to help update expectations + // for (final row in foundTriggers) { + // print(row['sql']); + // } + + expect(foundTriggers, [ + {'name': 'test_insert', 'sql': insert}, + {'name': 'test_update', 'sql': update}, + {'name': 'test_delete', 'sql': delete}, + ]); + } +} + final schema = { "tables": [ { From 31d90dd623126fc1c5d53f8067c1de293a132c11 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 11 Feb 2026 14:41:08 +0100 Subject: [PATCH 10/10] Add local-only columns --- crates/core/src/schema/common.rs | 17 ++++++++++++++ crates/core/src/schema/raw_table.rs | 33 +++++++++++++++++++++++----- crates/core/src/schema/table_info.rs | 2 ++ dart/test/schema_test.dart | 27 +++++++++++++++++++++-- 4 files changed, 72 insertions(+), 7 deletions(-) diff --git a/crates/core/src/schema/common.rs b/crates/core/src/schema/common.rs index cd2a0e5..f133d44 100644 --- a/crates/core/src/schema/common.rs +++ b/crates/core/src/schema/common.rs @@ -1,6 +1,7 @@ use core::slice; use alloc::{string::String, vec::Vec}; +use serde::Deserialize; use crate::schema::{ Column, CommonTableOptions, RawTable, Table, raw_table::InferredTableStructure, @@ -61,6 +62,7 @@ impl<'a> Iterator for SchemaTableColumnIterator<'a> { } } +#[derive(Default)] pub struct ColumnFilter { sorted_names: Vec, } @@ -82,3 +84,18 @@ impl ColumnFilter { .is_ok() } } + +impl AsRef> for ColumnFilter { + fn as_ref(&self) -> &Vec { + &self.sorted_names + } +} + +impl<'de> Deserialize<'de> for ColumnFilter { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Ok(Self::from(Vec::::deserialize(deserializer)?)) + } +} diff --git a/crates/core/src/schema/raw_table.rs b/crates/core/src/schema/raw_table.rs index f7be256..bd9c9f2 100644 --- a/crates/core/src/schema/raw_table.rs +++ b/crates/core/src/schema/raw_table.rs @@ -1,4 +1,4 @@ -use core::fmt::{self, Formatter, from_fn}; +use core::fmt::{self, Formatter, Write, from_fn}; use alloc::{ format, @@ -10,7 +10,7 @@ use powersync_sqlite_nostd::{Connection, Destructor, ResultCode}; use crate::{ error::PowerSyncError, - schema::{RawTable, SchemaTable}, + schema::{ColumnFilter, RawTable, SchemaTable}, utils::{InsertIntoCrud, SqlBuffer, WriteType}, views::table_columns_to_json_object, }; @@ -23,6 +23,7 @@ impl InferredTableStructure { pub fn read_from_database( table_name: &str, db: impl Connection, + ignored_local_columns: &ColumnFilter, ) -> Result, PowerSyncError> { let stmt = db.prepare_v2("select name from pragma_table_info(?)")?; stmt.bind_text(1, table_name, Destructor::STATIC)?; @@ -34,7 +35,7 @@ impl InferredTableStructure { let name = stmt.column_text(0)?; if name == "id" { has_id_column = true; - } else { + } else if !ignored_local_columns.matches(name) { columns.push(name.to_string()); } } @@ -63,7 +64,9 @@ pub fn generate_raw_table_trigger( return Err(PowerSyncError::argument_error("Table has no local name")); }; - let Some(resolved_table) = InferredTableStructure::read_from_database(local_table_name, db)? + let local_only_columns = &table.schema.local_only_columns; + let Some(resolved_table) = + InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)? else { return Err(PowerSyncError::argument_error(format!( "Could not find {} in local schema", @@ -80,7 +83,27 @@ pub fn generate_raw_table_trigger( buffer.create_trigger("", trigger_name); buffer.trigger_after(write, local_table_name); // Skip the trigger for writes during sync_local, these aren't crud writes. - buffer.push_str("WHEN NOT powersync_in_sync_operation() BEGIN\n"); + buffer.push_str("WHEN NOT powersync_in_sync_operation()"); + + if write == WriteType::Update && !local_only_columns.as_ref().is_empty() { + buffer.push_str(" AND\n("); + // If we have local-only columns, we want to add additional WHEN clauses to ensure the + // trigger runs for updates on synced columns. + for (i, name) in as_schema_table.column_names().enumerate() { + if i != 0 { + buffer.push_str(" OR "); + } + + // Generate OLD."column" IS NOT NEW."column" + buffer.push_str("OLD."); + let _ = buffer.identifier().write_str(name); + buffer.push_str(" IS NOT NEW."); + let _ = buffer.identifier().write_str(name); + } + buffer.push_str(")"); + } + + buffer.push_str(" BEGIN\n"); if table.schema.options.flags.insert_only() { if write != WriteType::Insert { diff --git a/crates/core/src/schema/table_info.rs b/crates/core/src/schema/table_info.rs index f91b4b6..20ff72a 100644 --- a/crates/core/src/schema/table_info.rs +++ b/crates/core/src/schema/table_info.rs @@ -37,6 +37,8 @@ pub struct RawTableSchema { /// Currently, this is only used to generate `CREATE TRIGGER` statements for the raw table. #[serde(default)] pub table_name: Option, + #[serde(default)] + pub local_only_columns: ColumnFilter, #[serde(flatten)] pub options: CommonTableOptions, } diff --git a/dart/test/schema_test.dart b/dart/test/schema_test.dart index 772e42a..c43ba35 100644 --- a/dart/test/schema_test.dart +++ b/dart/test/schema_test.dart @@ -280,6 +280,29 @@ END''', delete: ''' CREATE TRIGGER "test_delete" AFTER DELETE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN INSERT INTO powersync_crud(op,id,type,old_values) VALUES ('DELETE', OLD.id, 'sync_type', json_object('email', powersync_strip_subtype(OLD."email"), 'email_verified', powersync_strip_subtype(OLD."email_verified"))); +END''', + ), + // Local-only columns, should not be included in ps_crud. + _RawTableTestCase( + createTable: + 'CREATE TABLE users (id TEXT, synced_a TEXT, synced_b TEXT, local_a TEXT, local_b TEXT);', + tableOptions: { + 'table_name': 'users', + 'local_only_columns': ['local_a', 'local_b'], + }, + insert: ''' +CREATE TRIGGER "test_insert" AFTER INSERT ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud(op,id,type,data) VALUES ('PUT', NEW.id, 'sync_type', json(powersync_diff('{}', json_object('synced_a', powersync_strip_subtype(NEW."synced_a"), 'synced_b', powersync_strip_subtype(NEW."synced_b"))))); +END''', + update: ''' +CREATE TRIGGER "test_update" AFTER UPDATE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() AND +(OLD."synced_a" IS NOT NEW."synced_a" OR OLD."synced_b" IS NOT NEW."synced_b") BEGIN +SELECT CASE WHEN (OLD.id != NEW.id) THEN RAISE (FAIL, 'Cannot update id') END; +INSERT INTO powersync_crud(op,id,type,data,options) VALUES ('PATCH', NEW.id, 'sync_type', json(powersync_diff(json_object('synced_a', powersync_strip_subtype(OLD."synced_a"), 'synced_b', powersync_strip_subtype(OLD."synced_b")), json_object('synced_a', powersync_strip_subtype(NEW."synced_a"), 'synced_b', powersync_strip_subtype(NEW."synced_b")))), 0); +END''', + delete: ''' +CREATE TRIGGER "test_delete" AFTER DELETE ON "users" FOR EACH ROW WHEN NOT powersync_in_sync_operation() BEGIN +INSERT INTO powersync_crud(op,id,type) VALUES ('DELETE', OLD.id, 'sync_type'); END''', ), ]; @@ -330,9 +353,9 @@ SELECT db.select("SELECT name, sql FROM sqlite_schema WHERE type = 'trigger'"); // Uncomment to help update expectations - // for (final row in foundTriggers) { + //for (final row in foundTriggers) { // print(row['sql']); - // } + //} expect(foundTriggers, [ {'name': 'test_insert', 'sql': insert},