Skip to content

Commit e6986a5

Browse files
authored
Merge pull request #34444 from teskje/fix-replacement-mv-collection-creation
adapter,storage: fix creation of storage collections for replacement MVs
2 parents 301e099 + 6a6b253 commit e6986a5

File tree

2 files changed

+12
-26
lines changed

2 files changed

+12
-26
lines changed

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -611,12 +611,8 @@ impl Coordinator {
611611
&read_holds_owned
612612
};
613613

614-
let (dataflow_as_of, storage_as_of, until) = self.select_timestamps(
615-
id_bundle,
616-
refresh_schedule.as_ref(),
617-
read_holds,
618-
replacement_target,
619-
)?;
614+
let (dataflow_as_of, storage_as_of, until) =
615+
self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
620616

621617
tracing::info!(
622618
dataflow_as_of = ?dataflow_as_of,
@@ -781,7 +777,6 @@ impl Coordinator {
781777
id_bundle: CollectionIdBundle,
782778
refresh_schedule: Option<&RefreshSchedule>,
783779
read_holds: &ReadHolds<mz_repr::Timestamp>,
784-
replacement_target: Option<CatalogItemId>,
785780
) -> Result<
786781
(
787782
Antichain<mz_repr::Timestamp>,
@@ -845,24 +840,6 @@ impl Coordinator {
845840
.and_then(|r| r.try_step_forward());
846841
let until = Antichain::from_iter(until_ts);
847842

848-
// If this is a replacement MV, ensure that `storage_as_of` > the `since` of the target
849-
// storage collection. The storage controller requires the `since` of a storage collection
850-
// to always be greater than the `since`s of the collections it depends on.
851-
if let Some(target_id) = replacement_target {
852-
let target_gid = self.catalog().get_entry(&target_id).latest_global_id();
853-
let frontiers = self
854-
.controller
855-
.storage_collections
856-
.collection_frontiers(target_gid)
857-
.expect("replacement target exists");
858-
let lower_bound = frontiers
859-
.read_capabilities
860-
.iter()
861-
.map(|t| t.step_forward())
862-
.collect();
863-
storage_as_of.join_assign(&lower_bound);
864-
}
865-
866843
Ok((dataflow_as_of, storage_as_of, until))
867844
}
868845

src/storage-client/src/storage_collections.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1874,11 +1874,20 @@ where
18741874
// somewhere
18751875
debug!("mapping GlobalId={} to shard ({})", id, metadata.data_shard);
18761876

1877+
// If this collection has a primary, the primary is responsible for downgrading
1878+
// the critical since and it would be an error if we did so here while opening
1879+
// the since handle.
1880+
let since = if description.primary.is_some() {
1881+
None
1882+
} else {
1883+
description.since.as_ref()
1884+
};
1885+
18771886
let (write, mut since_handle) = this
18781887
.open_data_handles(
18791888
&id,
18801889
metadata.data_shard,
1881-
description.since.as_ref(),
1890+
since,
18821891
metadata.relation_desc.clone(),
18831892
persist_client,
18841893
)

0 commit comments

Comments
 (0)