diff --git a/src/ast/dml.rs b/src/ast/dml.rs index 63d6b86c..2d15caf7 100644 --- a/src/ast/dml.rs +++ b/src/ast/dml.rs @@ -37,6 +37,501 @@ use super::{ Setting, SqliteOnConflict, TableObject, TableWithJoins, }; +/// Index column type. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub struct IndexColumn { + pub column: OrderByExpr, + pub operator_class: Option, +} + +impl Display for IndexColumn { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.column)?; + if let Some(operator_class) = &self.operator_class { + write!(f, " {operator_class}")?; + } + Ok(()) + } +} + +/// CREATE INDEX statement. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub struct CreateIndex { + /// index name + pub name: Option, + #[cfg_attr(feature = "visitor", visit(with = "visit_relation"))] + pub table_name: ObjectName, + pub using: Option, + pub columns: Vec, + pub unique: bool, + pub concurrently: bool, + pub if_not_exists: bool, + pub include: Vec, + pub nulls_distinct: Option, + /// WITH clause: + pub with: Vec, + pub predicate: Option, +} + +impl Display for CreateIndex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "CREATE {unique}INDEX {concurrently}{if_not_exists}", + unique = if self.unique { "UNIQUE " } else { "" }, + concurrently = if self.concurrently { + "CONCURRENTLY " + } else { + "" + }, + if_not_exists = if self.if_not_exists { + "IF NOT EXISTS " + } else { + "" + }, + )?; + if let Some(value) = &self.name { + write!(f, "{value} ")?; + } + write!(f, "ON {}", self.table_name)?; + if let Some(value) = &self.using { + write!(f, " USING {value} ")?; + } + write!(f, "({})", display_separated(&self.columns, ","))?; + if !self.include.is_empty() { + write!(f, " INCLUDE ({})", display_separated(&self.include, ","))?; + } + if let Some(value) = self.nulls_distinct { + if value { + write!(f, " NULLS DISTINCT")?; + } else { + write!(f, " NULLS NOT DISTINCT")?; + } + } + if !self.with.is_empty() { + write!(f, " WITH ({})", display_comma_separated(&self.with))?; + } + if let Some(predicate) = &self.predicate { + write!(f, " WHERE {predicate}")?; + } + Ok(()) + } +} + +/// CREATE TABLE statement. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub struct CreateTable { + pub or_replace: bool, + pub temporary: bool, + pub external: bool, + pub dynamic: bool, + pub global: Option, + pub if_not_exists: bool, + pub transient: bool, + pub volatile: bool, + pub iceberg: bool, + /// Table name + #[cfg_attr(feature = "visitor", visit(with = "visit_relation"))] + pub name: ObjectName, + /// Optional schema + pub columns: Vec, + pub constraints: Vec, + pub hive_distribution: HiveDistributionStyle, + pub hive_formats: Option, + pub table_options: CreateTableOptions, + pub file_format: Option, + pub location: Option, + pub query: Option>, + pub without_rowid: bool, + pub like: Option, + pub clone: Option, + pub version: Option, + // For Hive dialect, the table comment is after the column definitions without `=`, + // so the `comment` field is optional and different than the comment field in the general options list. + // [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable) + pub comment: Option, + pub on_commit: Option, + /// ClickHouse "ON CLUSTER" clause: + /// + pub on_cluster: Option, + /// ClickHouse "PRIMARY KEY " clause. + /// + pub primary_key: Option>, + /// ClickHouse "ORDER BY " clause. Note that omitted ORDER BY is different + /// than empty (represented as ()), the latter meaning "no sorting". + /// + pub order_by: Option>, + /// BigQuery: A partition expression for the table. + /// + pub partition_by: Option>, + /// BigQuery: Table clustering column list. + /// + /// Snowflake: Table clustering list which contains base column, expressions on base columns. + /// + pub cluster_by: Option>>, + /// Hive: Table clustering column list. + /// + pub clustered_by: Option, + /// Postgres `INHERITs` clause, which contains the list of tables from which + /// the new table inherits. + /// + /// + pub inherits: Option>, + /// SQLite "STRICT" clause. + /// if the "STRICT" table-option keyword is added to the end, after the closing ")", + /// then strict typing rules apply to that table. + pub strict: bool, + /// Snowflake "COPY GRANTS" clause + /// + pub copy_grants: bool, + /// Snowflake "ENABLE_SCHEMA_EVOLUTION" clause + /// + pub enable_schema_evolution: Option, + /// Snowflake "CHANGE_TRACKING" clause + /// + pub change_tracking: Option, + /// Snowflake "DATA_RETENTION_TIME_IN_DAYS" clause + /// + pub data_retention_time_in_days: Option, + /// Snowflake "MAX_DATA_EXTENSION_TIME_IN_DAYS" clause + /// + pub max_data_extension_time_in_days: Option, + /// Snowflake "DEFAULT_DDL_COLLATION" clause + /// + pub default_ddl_collation: Option, + /// Snowflake "WITH AGGREGATION POLICY" clause + /// + pub with_aggregation_policy: Option, + /// Snowflake "WITH ROW ACCESS POLICY" clause + /// + pub with_row_access_policy: Option, + /// Snowflake "WITH TAG" clause + /// + pub with_tags: Option>, + /// Snowflake "EXTERNAL_VOLUME" clause for Iceberg tables + /// + pub external_volume: Option, + /// Snowflake "BASE_LOCATION" clause for Iceberg tables + /// + pub base_location: Option, + /// Snowflake "CATALOG" clause for Iceberg tables + /// + pub catalog: Option, + /// Snowflake "CATALOG_SYNC" clause for Iceberg tables + /// + pub catalog_sync: Option, + /// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables + /// + pub storage_serialization_policy: Option, + /// Snowflake "TARGET_LAG" clause for dybamic tables + /// + pub target_lag: Option, + /// Snowflake "WAREHOUSE" clause for dybamic tables + /// + pub warehouse: Option, + /// Snowflake "REFRESH_MODE" clause for dybamic tables + /// + pub refresh_mode: Option, + /// Snowflake "INITIALIZE" clause for dybamic tables + /// + pub initialize: Option, + /// Snowflake "REQUIRE USER" clause for dybamic tables + /// + pub require_user: bool, +} + +impl Display for CreateTable { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // We want to allow the following options + // Empty column list, allowed by PostgreSQL: + // `CREATE TABLE t ()` + // No columns provided for CREATE TABLE AS: + // `CREATE TABLE t AS SELECT a from t2` + // Columns provided for CREATE TABLE AS: + // `CREATE TABLE t (a INT) AS SELECT a from t2` + write!( + f, + "CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}{dynamic}TABLE {if_not_exists}{name}", + or_replace = if self.or_replace { "OR REPLACE " } else { "" }, + external = if self.external { "EXTERNAL " } else { "" }, + global = self.global + .map(|global| { + if global { + "GLOBAL " + } else { + "LOCAL " + } + }) + .unwrap_or(""), + if_not_exists = if self.if_not_exists { "IF NOT EXISTS " } else { "" }, + temporary = if self.temporary { "TEMPORARY " } else { "" }, + transient = if self.transient { "TRANSIENT " } else { "" }, + volatile = if self.volatile { "VOLATILE " } else { "" }, + // Only for Snowflake + iceberg = if self.iceberg { "ICEBERG " } else { "" }, + dynamic = if self.dynamic { "DYNAMIC " } else { "" }, + name = self.name, + )?; + if let Some(on_cluster) = &self.on_cluster { + write!(f, " ON CLUSTER {on_cluster}")?; + } + if !self.columns.is_empty() || !self.constraints.is_empty() { + f.write_str(" (")?; + NewLine.fmt(f)?; + Indent(DisplayCommaSeparated(&self.columns)).fmt(f)?; + if !self.columns.is_empty() && !self.constraints.is_empty() { + f.write_str(",")?; + SpaceOrNewline.fmt(f)?; + } + Indent(DisplayCommaSeparated(&self.constraints)).fmt(f)?; + NewLine.fmt(f)?; + f.write_str(")")?; + } else if self.query.is_none() && self.like.is_none() && self.clone.is_none() { + // PostgreSQL allows `CREATE TABLE t ();`, but requires empty parens + f.write_str(" ()")?; + } + + // Hive table comment should be after column definitions, please refer to: + // [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable) + if let Some(comment) = &self.comment { + write!(f, " COMMENT '{comment}'")?; + } + + // Only for SQLite + if self.without_rowid { + write!(f, " WITHOUT ROWID")?; + } + + // Only for Hive + if let Some(l) = &self.like { + write!(f, " LIKE {l}")?; + } + + if let Some(c) = &self.clone { + write!(f, " CLONE {c}")?; + } + + match &self.hive_distribution { + HiveDistributionStyle::PARTITIONED { columns } => { + write!(f, " PARTITIONED BY ({})", display_comma_separated(columns))?; + } + HiveDistributionStyle::SKEWED { + columns, + on, + stored_as_directories, + } => { + write!( + f, + " SKEWED BY ({})) ON ({})", + display_comma_separated(columns), + display_comma_separated(on) + )?; + if *stored_as_directories { + write!(f, " STORED AS DIRECTORIES")?; + } + } + _ => (), + } + + if let Some(clustered_by) = &self.clustered_by { + write!(f, " {clustered_by}")?; + } + + if let Some(HiveFormat { + row_format, + serde_properties, + storage, + location, + }) = &self.hive_formats + { + match row_format { + Some(HiveRowFormat::SERDE { class }) => write!(f, " ROW FORMAT SERDE '{class}'")?, + Some(HiveRowFormat::DELIMITED { delimiters }) => { + write!(f, " ROW FORMAT DELIMITED")?; + if !delimiters.is_empty() { + write!(f, " {}", display_separated(delimiters, " "))?; + } + } + None => (), + } + match storage { + Some(HiveIOFormat::IOF { + input_format, + output_format, + }) => write!( + f, + " STORED AS INPUTFORMAT {input_format} OUTPUTFORMAT {output_format}" + )?, + Some(HiveIOFormat::FileFormat { format }) if !self.external => { + write!(f, " STORED AS {format}")? + } + _ => (), + } + if let Some(serde_properties) = serde_properties.as_ref() { + write!( + f, + " WITH SERDEPROPERTIES ({})", + display_comma_separated(serde_properties) + )?; + } + if !self.external { + if let Some(loc) = location { + write!(f, " LOCATION '{loc}'")?; + } + } + } + if self.external { + if let Some(file_format) = self.file_format { + write!(f, " STORED AS {file_format}")?; + } + write!(f, " LOCATION '{}'", self.location.as_ref().unwrap())?; + } + + match &self.table_options { + options @ CreateTableOptions::With(_) + | options @ CreateTableOptions::Plain(_) + | options @ CreateTableOptions::TableProperties(_) => write!(f, " {options}")?, + _ => (), + } + + if let Some(primary_key) = &self.primary_key { + write!(f, " PRIMARY KEY {primary_key}")?; + } + if let Some(order_by) = &self.order_by { + write!(f, " ORDER BY {order_by}")?; + } + if let Some(inherits) = &self.inherits { + write!(f, " INHERITS ({})", display_comma_separated(inherits))?; + } + if let Some(partition_by) = self.partition_by.as_ref() { + write!(f, " PARTITION BY {partition_by}")?; + } + if let Some(cluster_by) = self.cluster_by.as_ref() { + write!(f, " CLUSTER BY {cluster_by}")?; + } + if let options @ CreateTableOptions::Options(_) = &self.table_options { + write!(f, " {options}")?; + } + if let Some(external_volume) = self.external_volume.as_ref() { + write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?; + } + + if let Some(catalog) = self.catalog.as_ref() { + write!(f, " CATALOG = '{catalog}'")?; + } + + if self.iceberg { + if let Some(base_location) = self.base_location.as_ref() { + write!(f, " BASE_LOCATION = '{base_location}'")?; + } + } + + if let Some(catalog_sync) = self.catalog_sync.as_ref() { + write!(f, " CATALOG_SYNC = '{catalog_sync}'")?; + } + + if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() { + write!( + f, + " STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}" + )?; + } + + if self.copy_grants { + write!(f, " COPY GRANTS")?; + } + + if let Some(is_enabled) = self.enable_schema_evolution { + write!( + f, + " ENABLE_SCHEMA_EVOLUTION={}", + if is_enabled { "TRUE" } else { "FALSE" } + )?; + } + + if let Some(is_enabled) = self.change_tracking { + write!( + f, + " CHANGE_TRACKING={}", + if is_enabled { "TRUE" } else { "FALSE" } + )?; + } + + if let Some(data_retention_time_in_days) = self.data_retention_time_in_days { + write!( + f, + " DATA_RETENTION_TIME_IN_DAYS={data_retention_time_in_days}", + )?; + } + + if let Some(max_data_extension_time_in_days) = self.max_data_extension_time_in_days { + write!( + f, + " MAX_DATA_EXTENSION_TIME_IN_DAYS={max_data_extension_time_in_days}", + )?; + } + + if let Some(default_ddl_collation) = &self.default_ddl_collation { + write!(f, " DEFAULT_DDL_COLLATION='{default_ddl_collation}'",)?; + } + + if let Some(with_aggregation_policy) = &self.with_aggregation_policy { + write!(f, " WITH AGGREGATION POLICY {with_aggregation_policy}",)?; + } + + if let Some(row_access_policy) = &self.with_row_access_policy { + write!(f, " {row_access_policy}",)?; + } + + if let Some(tag) = &self.with_tags { + write!(f, " WITH TAG ({})", display_comma_separated(tag.as_slice()))?; + } + + if let Some(target_lag) = &self.target_lag { + write!(f, " TARGET_LAG='{target_lag}'")?; + } + + if let Some(warehouse) = &self.warehouse { + write!(f, " WAREHOUSE={warehouse}")?; + } + + if let Some(refresh_mode) = &self.refresh_mode { + write!(f, " REFRESH_MODE={refresh_mode}")?; + } + + if let Some(initialize) = &self.initialize { + write!(f, " INITIALIZE={initialize}")?; + } + + if self.require_user { + write!(f, " REQUIRE USER")?; + } + + if self.on_commit.is_some() { + let on_commit = match self.on_commit { + Some(OnCommit::DeleteRows) => "ON COMMIT DELETE ROWS", + Some(OnCommit::PreserveRows) => "ON COMMIT PRESERVE ROWS", + Some(OnCommit::Drop) => "ON COMMIT DROP", + None => "", + }; + write!(f, " {on_commit}")?; + } + if self.strict { + write!(f, " STRICT")?; + } + if let Some(query) = &self.query { + write!(f, " AS {query}")?; + } + Ok(()) + } +} + /// INSERT statement. #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/src/ast/helpers/stmt_create_table.rs b/src/ast/helpers/stmt_create_table.rs index 9e9d229e..7edb71ae 100644 --- a/src/ast/helpers/stmt_create_table.rs +++ b/src/ast/helpers/stmt_create_table.rs @@ -72,6 +72,7 @@ pub struct CreateTableBuilder { pub transient: bool, pub volatile: bool, pub iceberg: bool, + pub dynamic: bool, pub name: ObjectName, pub columns: Vec, pub constraints: Vec, @@ -83,6 +84,7 @@ pub struct CreateTableBuilder { pub without_rowid: bool, pub like: Option, pub clone: Option, + pub version: Option, pub comment: Option, pub on_commit: Option, pub on_cluster: Option, @@ -108,6 +110,11 @@ pub struct CreateTableBuilder { pub catalog_sync: Option, pub storage_serialization_policy: Option, pub table_options: CreateTableOptions, + pub target_lag: Option, + pub warehouse: Option, + pub refresh_mode: Option, + pub initialize: Option, + pub require_user: bool, } impl CreateTableBuilder { @@ -121,6 +128,7 @@ impl CreateTableBuilder { transient: false, volatile: false, iceberg: false, + dynamic: false, name, columns: vec![], constraints: vec![], @@ -132,6 +140,7 @@ impl CreateTableBuilder { without_rowid: false, like: None, clone: None, + version: None, comment: None, on_commit: None, on_cluster: None, @@ -157,6 +166,11 @@ impl CreateTableBuilder { catalog_sync: None, storage_serialization_policy: None, table_options: CreateTableOptions::None, + target_lag: None, + warehouse: None, + refresh_mode: None, + initialize: None, + require_user: false, } } pub fn or_replace(mut self, or_replace: bool) -> Self { @@ -199,6 +213,11 @@ impl CreateTableBuilder { self } + pub fn dynamic(mut self, dynamic: bool) -> Self { + self.dynamic = dynamic; + self + } + pub fn columns(mut self, columns: Vec) -> Self { self.columns = columns; self @@ -248,6 +267,11 @@ impl CreateTableBuilder { self } + pub fn version(mut self, version: Option) -> Self { + self.version = version; + self + } + pub fn comment_after_column_def(mut self, comment: Option) -> Self { self.comment = comment; self @@ -382,24 +406,29 @@ impl CreateTableBuilder { self } - /// Returns true if the statement has exactly one source of info on the schema of the new table. - /// This is Snowflake-specific, some dialects allow more than one source. - pub(crate) fn validate_schema_info(&self) -> bool { - let mut sources = 0; - if !self.columns.is_empty() { - sources += 1; - } - if self.query.is_some() { - sources += 1; - } - if self.like.is_some() { - sources += 1; - } - if self.clone.is_some() { - sources += 1; - } + pub fn target_lag(mut self, target_lag: Option) -> Self { + self.target_lag = target_lag; + self + } - sources == 1 + pub fn warehouse(mut self, warehouse: Option) -> Self { + self.warehouse = warehouse; + self + } + + pub fn refresh_mode(mut self, refresh_mode: Option) -> Self { + self.refresh_mode = refresh_mode; + self + } + + pub fn initialize(mut self, initialize: Option) -> Self { + self.initialize = initialize; + self + } + + pub fn require_user(mut self, require_user: bool) -> Self { + self.require_user = require_user; + self } pub fn build(self) -> Statement { @@ -412,6 +441,7 @@ impl CreateTableBuilder { transient: self.transient, volatile: self.volatile, iceberg: self.iceberg, + dynamic: self.dynamic, name: self.name, columns: self.columns, constraints: self.constraints, @@ -423,6 +453,7 @@ impl CreateTableBuilder { without_rowid: self.without_rowid, like: self.like, clone: self.clone, + version: self.version, comment: self.comment, on_commit: self.on_commit, on_cluster: self.on_cluster, @@ -448,6 +479,11 @@ impl CreateTableBuilder { catalog_sync: self.catalog_sync, storage_serialization_policy: self.storage_serialization_policy, table_options: self.table_options, + target_lag: self.target_lag, + warehouse: self.warehouse, + refresh_mode: self.refresh_mode, + initialize: self.initialize, + require_user: self.require_user, }) } } @@ -468,6 +504,7 @@ impl TryFrom for CreateTableBuilder { transient, volatile, iceberg, + dynamic, name, columns, constraints, @@ -479,6 +516,7 @@ impl TryFrom for CreateTableBuilder { without_rowid, like, clone, + version, comment, on_commit, on_cluster, @@ -504,6 +542,11 @@ impl TryFrom for CreateTableBuilder { catalog_sync, storage_serialization_policy, table_options, + target_lag, + warehouse, + refresh_mode, + initialize, + require_user, }) => Ok(Self { or_replace, temporary, @@ -511,6 +554,7 @@ impl TryFrom for CreateTableBuilder { global, if_not_exists, transient, + dynamic, name, columns, constraints, @@ -522,6 +566,7 @@ impl TryFrom for CreateTableBuilder { without_rowid, like, clone, + version, comment, on_commit, on_cluster, @@ -549,6 +594,11 @@ impl TryFrom for CreateTableBuilder { catalog_sync, storage_serialization_policy, table_options, + target_lag, + warehouse, + refresh_mode, + initialize, + require_user, }), _ => Err(ParserError::ParserError(format!( "Expected create table statement, but received: {stmt}" diff --git a/src/ast/mod.rs b/src/ast/mod.rs index a30e2423..08df9db9 100644 --- a/src/ast/mod.rs +++ b/src/ast/mod.rs @@ -10521,6 +10521,48 @@ impl fmt::Display for CreateTableLike { } } +/// Specifies the refresh mode for the dynamic table. +/// +/// [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table) +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum RefreshModeKind { + Auto, + Full, + Incremental, +} + +impl fmt::Display for RefreshModeKind { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + RefreshModeKind::Auto => write!(f, "AUTO"), + RefreshModeKind::Full => write!(f, "FULL"), + RefreshModeKind::Incremental => write!(f, "INCREMENTAL"), + } + } +} + +/// Specifies the behavior of the initial refresh of the dynamic table. +/// +/// [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table) +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))] +pub enum InitializeKind { + OnCreate, + OnSchedule, +} + +impl fmt::Display for InitializeKind { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + InitializeKind::OnCreate => write!(f, "ON_CREATE"), + InitializeKind::OnSchedule => write!(f, "ON_SCHEDULE"), + } + } +} + #[cfg(test)] mod tests { use crate::tokenizer::Location; diff --git a/src/ast/spans.rs b/src/ast/spans.rs index 5d94e2dc..fc5b762a 100644 --- a/src/ast/spans.rs +++ b/src/ast/spans.rs @@ -579,6 +579,7 @@ impl Spanned for CreateTable { temporary: _, // bool external: _, // bool global: _, // bool + dynamic: _, // bool if_not_exists: _, // bool transient: _, // bool volatile: _, // bool @@ -619,6 +620,12 @@ impl Spanned for CreateTable { catalog_sync: _, // todo, Snowflake specific storage_serialization_policy: _, table_options, + target_lag: _, + warehouse: _, + version: _, + refresh_mode: _, + initialize: _, + require_user: _, } = self; union_spans( diff --git a/src/dialect/snowflake.rs b/src/dialect/snowflake.rs index 7ef4de9c..f2b63212 100644 --- a/src/dialect/snowflake.rs +++ b/src/dialect/snowflake.rs @@ -235,6 +235,8 @@ impl Dialect for SnowflakeDialect { _ => None, }; + let dynamic = parser.parse_keyword(Keyword::DYNAMIC); + let mut temporary = false; let mut volatile = false; let mut transient = false; @@ -259,7 +261,7 @@ impl Dialect for SnowflakeDialect { return Some(parse_create_stage(or_replace, temporary, parser)); } else if parser.parse_keyword(Keyword::TABLE) { return Some(parse_create_table( - or_replace, global, temporary, volatile, transient, iceberg, parser, + or_replace, global, temporary, volatile, transient, iceberg, dynamic, parser, )); } else if parser.parse_keyword(Keyword::DATABASE) { return Some(parse_create_database(or_replace, transient, parser)); @@ -614,6 +616,7 @@ fn parse_alter_session(parser: &mut Parser, set: bool) -> Result /// +#[allow(clippy::too_many_arguments)] pub fn parse_create_table( or_replace: bool, global: Option, @@ -621,6 +624,7 @@ pub fn parse_create_table( volatile: bool, transient: bool, iceberg: bool, + dynamic: bool, parser: &mut Parser, ) -> Result { let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); @@ -634,6 +638,7 @@ pub fn parse_create_table( .volatile(volatile) .iceberg(iceberg) .global(global) + .dynamic(dynamic) .hive_formats(Some(Default::default())); // Snowflake does not enforce order of the parameters in the statement. The parser needs to @@ -772,6 +777,49 @@ pub fn parse_create_table( Keyword::IF if parser.parse_keywords(&[Keyword::NOT, Keyword::EXISTS]) => { builder = builder.if_not_exists(true); } + Keyword::TARGET_LAG => { + parser.expect_token(&Token::Eq)?; + let target_lag = parser.parse_literal_string()?; + builder = builder.target_lag(Some(target_lag)); + } + Keyword::WAREHOUSE => { + parser.expect_token(&Token::Eq)?; + let warehouse = parser.parse_identifier()?; + builder = builder.warehouse(Some(warehouse)); + } + Keyword::AT | Keyword::BEFORE => { + parser.prev_token(); + let version = parser.maybe_parse_table_version()?; + builder = builder.version(version); + } + Keyword::REFRESH_MODE => { + parser.expect_token(&Token::Eq)?; + let refresh_mode = match parser.parse_one_of_keywords(&[ + Keyword::AUTO, + Keyword::FULL, + Keyword::INCREMENTAL, + ]) { + Some(Keyword::AUTO) => Some(RefreshModeKind::Auto), + Some(Keyword::FULL) => Some(RefreshModeKind::Full), + Some(Keyword::INCREMENTAL) => Some(RefreshModeKind::Incremental), + _ => return parser.expected("AUTO, FULL or INCREMENTAL", next_token), + }; + builder = builder.refresh_mode(refresh_mode); + } + Keyword::INITIALIZE => { + parser.expect_token(&Token::Eq)?; + let initialize = match parser + .parse_one_of_keywords(&[Keyword::ON_CREATE, Keyword::ON_SCHEDULE]) + { + Some(Keyword::ON_CREATE) => Some(InitializeKind::OnCreate), + Some(Keyword::ON_SCHEDULE) => Some(InitializeKind::OnSchedule), + _ => return parser.expected("ON_CREATE or ON_SCHEDULE", next_token), + }; + builder = builder.initialize(initialize); + } + Keyword::REQUIRE if parser.parse_keyword(Keyword::USER) => { + builder = builder.require_user(true); + } _ => { return parser.expected("end of statement", next_token); } @@ -782,21 +830,9 @@ pub fn parse_create_table( builder = builder.columns(columns).constraints(constraints); } Token::EOF => { - if !builder.validate_schema_info() { - return Err(ParserError::ParserError( - "unexpected end of input".to_string(), - )); - } - break; } Token::SemiColon => { - if !builder.validate_schema_info() { - return Err(ParserError::ParserError( - "unexpected end of input".to_string(), - )); - } - parser.prev_token(); break; } diff --git a/src/keywords.rs b/src/keywords.rs index 659bc043..f0230226 100644 --- a/src/keywords.rs +++ b/src/keywords.rs @@ -300,6 +300,7 @@ define_keywords!( DOMAIN, DOUBLE, DOW, + DOWNSTREAM, DOY, DROP, DRY, @@ -445,10 +446,12 @@ define_keywords!( INCLUDE_NULL_VALUES, INCLUDING, INCREMENT, + INCREMENTAL, INDEX, INDICATOR, INHERIT, INHERITS, + INITIALIZE, INITIALLY, INNER, INOUT, @@ -641,6 +644,8 @@ define_keywords!( ON, ONE, ONLY, + ON_CREATE, + ON_SCHEDULE, OPEN, OPENJSON, OPERATE, @@ -742,6 +747,7 @@ define_keywords!( REF, REFERENCES, REFERENCING, + REFRESH_MODE, REGCLASS, REGEXP, REGR_AVGX, @@ -768,6 +774,7 @@ define_keywords!( REPLICA, REPLICATE, REPLICATION, + REQUIRE, RESET, RESOLVE, RESOURCE, @@ -905,6 +912,7 @@ define_keywords!( TABLESPACE, TAG, TARGET, + TARGET_LAG, TASK, TBLPROPERTIES, TEMP, diff --git a/tests/sqlparser_duckdb.rs b/tests/sqlparser_duckdb.rs index fe14b7ba..5bad7336 100644 --- a/tests/sqlparser_duckdb.rs +++ b/tests/sqlparser_duckdb.rs @@ -700,6 +700,7 @@ fn test_duckdb_union_datatype() { transient: Default::default(), volatile: Default::default(), iceberg: Default::default(), + dynamic: Default::default(), name: ObjectName::from(vec!["tbl1".into()]), columns: vec![ ColumnDef { @@ -774,7 +775,13 @@ fn test_duckdb_union_datatype() { catalog: Default::default(), catalog_sync: Default::default(), storage_serialization_policy: Default::default(), - table_options: CreateTableOptions::None + table_options: CreateTableOptions::None, + target_lag: None, + warehouse: None, + version: None, + refresh_mode: None, + initialize: None, + require_user: Default::default(), }), stmt ); diff --git a/tests/sqlparser_mssql.rs b/tests/sqlparser_mssql.rs index 63e4eecb..a1e05d03 100644 --- a/tests/sqlparser_mssql.rs +++ b/tests/sqlparser_mssql.rs @@ -1848,6 +1848,7 @@ fn parse_create_table_with_valid_options() { temporary: false, external: false, global: None, + dynamic: false, if_not_exists: false, transient: false, volatile: false, @@ -1924,7 +1925,13 @@ fn parse_create_table_with_valid_options() { catalog: None, catalog_sync: None, storage_serialization_policy: None, - table_options: CreateTableOptions::With(with_options) + table_options: CreateTableOptions::With(with_options), + target_lag: None, + warehouse: None, + version: None, + refresh_mode: None, + initialize: None, + require_user: false, }) ); } @@ -2031,6 +2038,7 @@ fn parse_create_table_with_identity_column() { temporary: false, external: false, global: None, + dynamic: false, if_not_exists: false, transient: false, volatile: false, @@ -2088,7 +2096,13 @@ fn parse_create_table_with_identity_column() { catalog: None, catalog_sync: None, storage_serialization_policy: None, - table_options: CreateTableOptions::None + table_options: CreateTableOptions::None, + target_lag: None, + warehouse: None, + version: None, + refresh_mode: None, + initialize: None, + require_user: false, }), ); } diff --git a/tests/sqlparser_postgres.rs b/tests/sqlparser_postgres.rs index a7c9779b..43b30aad 100644 --- a/tests/sqlparser_postgres.rs +++ b/tests/sqlparser_postgres.rs @@ -5932,6 +5932,7 @@ fn parse_trigger_related_functions() { temporary: false, external: false, global: None, + dynamic: false, if_not_exists: false, transient: false, volatile: false, @@ -5997,7 +5998,13 @@ fn parse_trigger_related_functions() { catalog: None, catalog_sync: None, storage_serialization_policy: None, - table_options: CreateTableOptions::None + table_options: CreateTableOptions::None, + target_lag: None, + warehouse: None, + version: None, + refresh_mode: None, + initialize: None, + require_user: false, } ); diff --git a/tests/sqlparser_snowflake.rs b/tests/sqlparser_snowflake.rs index 55f6ab28..3745ec99 100644 --- a/tests/sqlparser_snowflake.rs +++ b/tests/sqlparser_snowflake.rs @@ -528,23 +528,6 @@ fn test_snowflake_create_table_comment() { } } -#[test] -fn test_snowflake_create_table_incomplete_statement() { - assert_eq!( - snowflake().parse_sql_statements("CREATE TABLE my_table"), - Err(ParserError::ParserError( - "unexpected end of input".to_string() - )) - ); - - assert_eq!( - snowflake().parse_sql_statements("CREATE TABLE my_table; (c int)"), - Err(ParserError::ParserError( - "unexpected end of input".to_string() - )) - ); -} - #[test] fn test_snowflake_single_line_tokenize() { let sql = "CREATE TABLE# this is a comment \ntable_1"; @@ -1019,27 +1002,6 @@ fn test_snowflake_create_table_trailing_options() { .unwrap(); } -#[test] -fn test_snowflake_create_table_valid_schema_info() { - // Validate there's exactly one source of information on the schema of the new table - assert_eq!( - snowflake() - .parse_sql_statements("CREATE TABLE dst") - .is_err(), - true - ); - assert_eq!( - snowflake().parse_sql_statements("CREATE OR REPLACE TEMP TABLE dst LIKE src AS (SELECT * FROM CUSTOMERS) ON COMMIT PRESERVE ROWS").is_err(), - true - ); - assert_eq!( - snowflake() - .parse_sql_statements("CREATE OR REPLACE TEMP TABLE dst CLONE customers LIKE customer2") - .is_err(), - true - ); -} - #[test] fn parse_sf_create_or_replace_view_with_comment_missing_equal() { assert!(snowflake_and_generic() @@ -1104,6 +1066,56 @@ fn parse_sf_create_table_or_view_with_dollar_quoted_comment() { ); } +#[test] +fn parse_create_dynamic_table() { + snowflake().verified_stmt(r#"CREATE OR REPLACE DYNAMIC TABLE my_dynamic_table TARGET_LAG='20 minutes' WAREHOUSE=mywh AS SELECT product_id, product_name FROM staging_table"#); + snowflake() + .parse_sql_statements( + r#" +CREATE DYNAMIC ICEBERG TABLE my_dynamic_table (date TIMESTAMP_NTZ, id NUMBER, content STRING) + TARGET_LAG = '20 minutes' + WAREHOUSE = mywh + EXTERNAL_VOLUME = 'my_external_volume' + CATALOG = 'SNOWFLAKE' + BASE_LOCATION = 'my_iceberg_table' + AS + SELECT product_id, product_name FROM staging_table; + "#, + ) + .unwrap(); + + snowflake() + .parse_sql_statements( + r#" +CREATE DYNAMIC TABLE my_dynamic_table (date TIMESTAMP_NTZ, id NUMBER, content VARIANT) + TARGET_LAG = '20 minutes' + WAREHOUSE = mywh + CLUSTER BY (date, id) + AS + SELECT product_id, product_name FROM staging_table; + "#, + ) + .unwrap(); + + snowflake().parse_sql_statements(r#" +CREATE DYNAMIC TABLE my_cloned_dynamic_table CLONE my_dynamic_table AT (TIMESTAMP => TO_TIMESTAMP_TZ('04/05/2013 01:02:03', 'mm/dd/yyyy hh24:mi:ss')); + "#).unwrap(); + + snowflake() + .parse_sql_statements( + r#" +CREATE DYNAMIC TABLE my_dynamic_table + TARGET_LAG = 'DOWNSTREAM' + WAREHOUSE = mywh + INITIALIZE = on_schedule + REQUIRE USER + AS + SELECT product_id, product_name FROM staging_table; + "#, + ) + .unwrap(); +} + #[test] fn test_sf_derived_table_in_parenthesis() { // Nesting a subquery in an extra set of parentheses is non-standard,