diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 3703534979c..a9fcc833e99 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -184,7 +184,6 @@ impl DeploymentStore { schema: &InputSchema, deployment: DeploymentCreate, site: Arc, - graft_base: Option>, replace: bool, on_sync: OnSync, index_def: Option, @@ -217,7 +216,7 @@ impl DeploymentStore { let query = format!("create schema {}", &site.namespace); conn.batch_execute(&query).await?; - let layout = Layout::create_relational_schema( + let _ = Layout::create_relational_schema( conn, site.clone(), schema, @@ -225,19 +224,6 @@ impl DeploymentStore { index_def, ) .await?; - // See if we are grafting and check that the graft is permissible - if let Some(base) = graft_base { - let errors = layout.can_copy_from(&base); - if !errors.is_empty() { - return Err(StoreError::Unknown(anyhow!( - "The subgraph `{}` cannot be used as the graft base \ - for `{}` because the schemas are incompatible:\n - {}", - &base.catalog.site.namespace, - &layout.catalog.site.namespace, - errors.join("\n - ") - ))); - } - } // Create data sources table if site.schema_version.private_data_sources() { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 478d21eba02..74ec326de6c 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -39,6 +39,7 @@ use graph::{ use graph::{derive::CheapClone, futures03::future::join_all, prelude::alloy::primitives::Address}; use crate::{ + catalog::Catalog, deployment::{OnSync, SubgraphHealth}, primary::{self, DeploymentId, Mirror as PrimaryMirror, Primary, Site}, relational::{ @@ -88,7 +89,7 @@ impl Shard { .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') { return Err(StoreError::InvalidIdentifier(format!( - "shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'", name + "shard name `{name}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'" ))); } Ok(Shard(name)) @@ -339,7 +340,7 @@ impl SubgraphStore { self.evict(schema.id())?; let graft_base = deployment.graft_base.as_ref(); - let (site, exists, node_id) = { + let (site, deployment_store, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs // to go in the shard and onto the node that the placement @@ -351,42 +352,69 @@ impl SubgraphStore { // assignment that we used last time to avoid creating // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id).await?; - let mut conn = self.primary_conn().await?; - let (site, site_was_created) = conn + + let mut pconn = self.primary_conn().await?; + + let (site, site_was_created) = pconn .allocate_site(shard, schema.id(), network_name, graft_base) .await?; - let node_id = conn.assigned_node(&site).await?.unwrap_or(node_id); - (site, !site_was_created, node_id) - }; - let site = Arc::new(site); + let node_id = pconn.assigned_node(&site).await?.unwrap_or(node_id); + let site = Arc::new(site); + let deployment_store = self + .stores + .get(&site.shard) + .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - // if the deployment already exists, we don't need to perform any copying - // so we can set graft_base to None - // if it doesn't exist, we need to copy the graft base to the new deployment - let graft_base_layout = if !exists { - let graft_base = match deployment.graft_base.as_ref() { - Some(base) => Some(self.layout(base).await?), - None => None, + let mut shard_conn = deployment_store.get_replica_conn(ReplicaId::Main).await?; + let needs_check = if site_was_created { + true + } else { + // If deployment does not exist, but site exists it means + // that we are recovering from a failed deployment creation with an orphaned site. + // In that case, we should check graft compatibility again. + let exists = crate::deployment::exists(&mut shard_conn, &site).await?; + !exists }; - if let Some(graft_base) = &graft_base { - self.primary_conn() - .await? - .record_active_copy(graft_base.site.as_ref(), site.as_ref()) + if let Some(graft_base) = graft_base { + let base_layout = self.layout(graft_base).await?; + + if needs_check { + let entities_with_causality_region = + deployment.manifest.entities_with_causality_region.clone(); + let catalog = Catalog::for_creation( + &mut shard_conn, + site.cheap_clone(), + entities_with_causality_region.into_iter().collect(), + ) .await?; + let layout = Layout::new(site.cheap_clone(), schema, catalog)?; + + let errors = layout.can_copy_from(&base_layout); + if !errors.is_empty() { + return Err(StoreError::Unknown(anyhow!( + "The subgraph `{}` cannot be used as the graft base \ + for `{}` because the schemas are incompatible:\n - {}", + &base_layout.catalog.site.namespace, + &layout.catalog.site.namespace, + errors.join("\n - ") + ))); + } + + // Only record active copy when the graft check passes and a copy is needed. + // If deployment already exists, the copy has either completed (no active_copies + // record) or is in progress (active_copies record already exists). + pconn + .record_active_copy(base_layout.site.as_ref(), site.as_ref()) + .await?; + } } - graft_base - } else { - None + + (site, deployment_store, node_id) }; // Create the actual databases schema and metadata entries - let deployment_store = self - .stores - .get(&site.shard) - .ok_or_else(|| StoreError::UnknownShard(site.shard.to_string()))?; - - let index_def = if let Some(graft) = &graft_base.clone() { + let index_def = if let Some(graft) = graft_base { if let Some(site) = self.sites.get(graft) { let store = self .stores @@ -406,7 +434,6 @@ impl SubgraphStore { schema, deployment, site.clone(), - graft_base_layout, replace, OnSync::None, index_def, @@ -731,8 +758,7 @@ impl Inner { if src.id == dst.id { return Err(StoreError::Unknown(anyhow!( - "can not copy deployment {} onto itself", - src_loc + "can not copy deployment {src_loc} onto itself" ))); } // The very last thing we do when we set up a copy here is assign it @@ -740,9 +766,7 @@ impl Inner { // should not have been called. if let Some(node) = self.mirror.assigned_node(dst.as_ref()).await? { return Err(StoreError::Unknown(anyhow!( - "can not copy into deployment {} since it is already assigned to node `{}`", - dst_loc, - node + "can not copy into deployment {dst_loc} since it is already assigned to node `{node}`" ))); } let deployment = src_store.load_deployment(src.clone()).await?; @@ -758,8 +782,6 @@ impl Inner { history_blocks_override: None, }; - let graft_base = self.layout(&src.deployment).await?; - self.primary_conn() .await? .record_active_copy(src.as_ref(), dst.as_ref()) @@ -776,7 +798,6 @@ impl Inner { &src_layout.input_schema, deployment, dst.clone(), - Some(graft_base), false, on_sync, Some(index_def),