Snowflake: Add support for CREATE DYNAMIC TABLE (#1960)

This commit is contained in:
Yoav Cohen 2025-08-20 12:08:22 +03:00 committed by GitHub
parent 3b5242821e
commit 4b8797e8f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 342 additions and 96 deletions

View file

@ -33,10 +33,11 @@ use crate::ast::{
display_comma_separated, display_separated, ArgMode, CommentDef, CreateFunctionBody, display_comma_separated, display_separated, ArgMode, CommentDef, CreateFunctionBody,
CreateFunctionUsing, CreateTableLikeKind, CreateTableOptions, DataType, Expr, FileFormat, CreateFunctionUsing, CreateTableLikeKind, CreateTableOptions, DataType, Expr, FileFormat,
FunctionBehavior, FunctionCalledOnNull, FunctionDeterminismSpecifier, FunctionParallel, FunctionBehavior, FunctionCalledOnNull, FunctionDeterminismSpecifier, FunctionParallel,
HiveDistributionStyle, HiveFormat, HiveIOFormat, HiveRowFormat, Ident, MySQLColumnPosition, HiveDistributionStyle, HiveFormat, HiveIOFormat, HiveRowFormat, Ident, InitializeKind,
ObjectName, OnCommit, OneOrManyWithParens, OperateFunctionArg, OrderByExpr, ProjectionSelect, MySQLColumnPosition, ObjectName, OnCommit, OneOrManyWithParens, OperateFunctionArg,
Query, RowAccessPolicy, SequenceOptions, Spanned, SqlOption, StorageSerializationPolicy, Tag, OrderByExpr, ProjectionSelect, Query, RefreshModeKind, RowAccessPolicy, SequenceOptions,
Value, ValueWithSpan, WrappedCollection, Spanned, SqlOption, StorageSerializationPolicy, TableVersion, Tag, Value, ValueWithSpan,
WrappedCollection,
}; };
use crate::display_utils::{DisplayCommaSeparated, Indent, NewLine, SpaceOrNewline}; use crate::display_utils::{DisplayCommaSeparated, Indent, NewLine, SpaceOrNewline};
use crate::keywords::Keyword; use crate::keywords::Keyword;
@ -2428,6 +2429,7 @@ pub struct CreateTable {
pub or_replace: bool, pub or_replace: bool,
pub temporary: bool, pub temporary: bool,
pub external: bool, pub external: bool,
pub dynamic: bool,
pub global: Option<bool>, pub global: Option<bool>,
pub if_not_exists: bool, pub if_not_exists: bool,
pub transient: bool, pub transient: bool,
@ -2448,6 +2450,7 @@ pub struct CreateTable {
pub without_rowid: bool, pub without_rowid: bool,
pub like: Option<CreateTableLikeKind>, pub like: Option<CreateTableLikeKind>,
pub clone: Option<ObjectName>, pub clone: Option<ObjectName>,
pub version: Option<TableVersion>,
// For Hive dialect, the table comment is after the column definitions without `=`, // For Hive dialect, the table comment is after the column definitions without `=`,
// so the `comment` field is optional and different than the comment field in the general options list. // so the `comment` field is optional and different than the comment field in the general options list.
// [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable) // [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTable)
@ -2525,6 +2528,21 @@ pub struct CreateTable {
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables /// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table> /// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub storage_serialization_policy: Option<StorageSerializationPolicy>, pub storage_serialization_policy: Option<StorageSerializationPolicy>,
/// Snowflake "TARGET_LAG" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub target_lag: Option<String>,
/// Snowflake "WAREHOUSE" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub warehouse: Option<Ident>,
/// Snowflake "REFRESH_MODE" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub refresh_mode: Option<RefreshModeKind>,
/// Snowflake "INITIALIZE" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub initialize: Option<InitializeKind>,
/// Snowflake "REQUIRE USER" clause for dybamic tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table>
pub require_user: bool,
} }
impl fmt::Display for CreateTable { impl fmt::Display for CreateTable {
@ -2538,7 +2556,7 @@ impl fmt::Display for CreateTable {
// `CREATE TABLE t (a INT) AS SELECT a from t2` // `CREATE TABLE t (a INT) AS SELECT a from t2`
write!( write!(
f, f,
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}TABLE {if_not_exists}{name}", "CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{dynamic}{iceberg}TABLE {if_not_exists}{name}",
or_replace = if self.or_replace { "OR REPLACE " } else { "" }, or_replace = if self.or_replace { "OR REPLACE " } else { "" },
external = if self.external { "EXTERNAL " } else { "" }, external = if self.external { "EXTERNAL " } else { "" },
global = self.global global = self.global
@ -2556,6 +2574,7 @@ impl fmt::Display for CreateTable {
volatile = if self.volatile { "VOLATILE " } else { "" }, volatile = if self.volatile { "VOLATILE " } else { "" },
// Only for Snowflake // Only for Snowflake
iceberg = if self.iceberg { "ICEBERG " } else { "" }, iceberg = if self.iceberg { "ICEBERG " } else { "" },
dynamic = if self.dynamic { "DYNAMIC " } else { "" },
name = self.name, name = self.name,
)?; )?;
if let Some(on_cluster) = &self.on_cluster { if let Some(on_cluster) = &self.on_cluster {
@ -2598,6 +2617,10 @@ impl fmt::Display for CreateTable {
write!(f, " CLONE {c}")?; write!(f, " CLONE {c}")?;
} }
if let Some(version) = &self.version {
write!(f, " {version}")?;
}
match &self.hive_distribution { match &self.hive_distribution {
HiveDistributionStyle::PARTITIONED { columns } => { HiveDistributionStyle::PARTITIONED { columns } => {
write!(f, " PARTITIONED BY ({})", display_comma_separated(columns))?; write!(f, " PARTITIONED BY ({})", display_comma_separated(columns))?;
@ -2700,27 +2723,27 @@ impl fmt::Display for CreateTable {
write!(f, " {options}")?; write!(f, " {options}")?;
} }
if let Some(external_volume) = self.external_volume.as_ref() { if let Some(external_volume) = self.external_volume.as_ref() {
write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?; write!(f, " EXTERNAL_VOLUME='{external_volume}'")?;
} }
if let Some(catalog) = self.catalog.as_ref() { if let Some(catalog) = self.catalog.as_ref() {
write!(f, " CATALOG = '{catalog}'")?; write!(f, " CATALOG='{catalog}'")?;
} }
if self.iceberg { if self.iceberg {
if let Some(base_location) = self.base_location.as_ref() { if let Some(base_location) = self.base_location.as_ref() {
write!(f, " BASE_LOCATION = '{base_location}'")?; write!(f, " BASE_LOCATION='{base_location}'")?;
} }
} }
if let Some(catalog_sync) = self.catalog_sync.as_ref() { if let Some(catalog_sync) = self.catalog_sync.as_ref() {
write!(f, " CATALOG_SYNC = '{catalog_sync}'")?; write!(f, " CATALOG_SYNC='{catalog_sync}'")?;
} }
if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() { if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() {
write!( write!(
f, f,
" STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}" " STORAGE_SERIALIZATION_POLICY={storage_serialization_policy}"
)?; )?;
} }
@ -2774,6 +2797,26 @@ impl fmt::Display for CreateTable {
write!(f, " WITH TAG ({})", display_comma_separated(tag.as_slice()))?; write!(f, " WITH TAG ({})", display_comma_separated(tag.as_slice()))?;
} }
if let Some(target_lag) = &self.target_lag {
write!(f, " TARGET_LAG='{target_lag}'")?;
}
if let Some(warehouse) = &self.warehouse {
write!(f, " WAREHOUSE={warehouse}")?;
}
if let Some(refresh_mode) = &self.refresh_mode {
write!(f, " REFRESH_MODE={refresh_mode}")?;
}
if let Some(initialize) = &self.initialize {
write!(f, " INITIALIZE={initialize}")?;
}
if self.require_user {
write!(f, " REQUIRE USER")?;
}
if self.on_commit.is_some() { if self.on_commit.is_some() {
let on_commit = match self.on_commit { let on_commit = match self.on_commit {
Some(OnCommit::DeleteRows) => "ON COMMIT DELETE ROWS", Some(OnCommit::DeleteRows) => "ON COMMIT DELETE ROWS",

View file

@ -26,9 +26,9 @@ use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ use crate::ast::{
ClusteredBy, ColumnDef, CommentDef, CreateTable, CreateTableLikeKind, CreateTableOptions, Expr, ClusteredBy, ColumnDef, CommentDef, CreateTable, CreateTableLikeKind, CreateTableOptions, Expr,
FileFormat, HiveDistributionStyle, HiveFormat, Ident, ObjectName, OnCommit, FileFormat, HiveDistributionStyle, HiveFormat, Ident, InitializeKind, ObjectName, OnCommit,
OneOrManyWithParens, Query, RowAccessPolicy, Statement, StorageSerializationPolicy, OneOrManyWithParens, Query, RefreshModeKind, RowAccessPolicy, Statement,
TableConstraint, Tag, WrappedCollection, StorageSerializationPolicy, TableConstraint, TableVersion, Tag, WrappedCollection,
}; };
use crate::parser::ParserError; use crate::parser::ParserError;
@ -72,6 +72,7 @@ pub struct CreateTableBuilder {
pub transient: bool, pub transient: bool,
pub volatile: bool, pub volatile: bool,
pub iceberg: bool, pub iceberg: bool,
pub dynamic: bool,
pub name: ObjectName, pub name: ObjectName,
pub columns: Vec<ColumnDef>, pub columns: Vec<ColumnDef>,
pub constraints: Vec<TableConstraint>, pub constraints: Vec<TableConstraint>,
@ -83,6 +84,7 @@ pub struct CreateTableBuilder {
pub without_rowid: bool, pub without_rowid: bool,
pub like: Option<CreateTableLikeKind>, pub like: Option<CreateTableLikeKind>,
pub clone: Option<ObjectName>, pub clone: Option<ObjectName>,
pub version: Option<TableVersion>,
pub comment: Option<CommentDef>, pub comment: Option<CommentDef>,
pub on_commit: Option<OnCommit>, pub on_commit: Option<OnCommit>,
pub on_cluster: Option<Ident>, pub on_cluster: Option<Ident>,
@ -108,6 +110,11 @@ pub struct CreateTableBuilder {
pub catalog_sync: Option<String>, pub catalog_sync: Option<String>,
pub storage_serialization_policy: Option<StorageSerializationPolicy>, pub storage_serialization_policy: Option<StorageSerializationPolicy>,
pub table_options: CreateTableOptions, pub table_options: CreateTableOptions,
pub target_lag: Option<String>,
pub warehouse: Option<Ident>,
pub refresh_mode: Option<RefreshModeKind>,
pub initialize: Option<InitializeKind>,
pub require_user: bool,
} }
impl CreateTableBuilder { impl CreateTableBuilder {
@ -121,6 +128,7 @@ impl CreateTableBuilder {
transient: false, transient: false,
volatile: false, volatile: false,
iceberg: false, iceberg: false,
dynamic: false,
name, name,
columns: vec![], columns: vec![],
constraints: vec![], constraints: vec![],
@ -132,6 +140,7 @@ impl CreateTableBuilder {
without_rowid: false, without_rowid: false,
like: None, like: None,
clone: None, clone: None,
version: None,
comment: None, comment: None,
on_commit: None, on_commit: None,
on_cluster: None, on_cluster: None,
@ -157,6 +166,11 @@ impl CreateTableBuilder {
catalog_sync: None, catalog_sync: None,
storage_serialization_policy: None, storage_serialization_policy: None,
table_options: CreateTableOptions::None, table_options: CreateTableOptions::None,
target_lag: None,
warehouse: None,
refresh_mode: None,
initialize: None,
require_user: false,
} }
} }
pub fn or_replace(mut self, or_replace: bool) -> Self { pub fn or_replace(mut self, or_replace: bool) -> Self {
@ -199,6 +213,11 @@ impl CreateTableBuilder {
self self
} }
pub fn dynamic(mut self, dynamic: bool) -> Self {
self.dynamic = dynamic;
self
}
pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self { pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
self.columns = columns; self.columns = columns;
self self
@ -248,6 +267,11 @@ impl CreateTableBuilder {
self self
} }
pub fn version(mut self, version: Option<TableVersion>) -> Self {
self.version = version;
self
}
pub fn comment_after_column_def(mut self, comment: Option<CommentDef>) -> Self { pub fn comment_after_column_def(mut self, comment: Option<CommentDef>) -> Self {
self.comment = comment; self.comment = comment;
self self
@ -382,24 +406,29 @@ impl CreateTableBuilder {
self self
} }
/// Returns true if the statement has exactly one source of info on the schema of the new table. pub fn target_lag(mut self, target_lag: Option<String>) -> Self {
/// This is Snowflake-specific, some dialects allow more than one source. self.target_lag = target_lag;
pub(crate) fn validate_schema_info(&self) -> bool { self
let mut sources = 0; }
if !self.columns.is_empty() {
sources += 1;
}
if self.query.is_some() {
sources += 1;
}
if self.like.is_some() {
sources += 1;
}
if self.clone.is_some() {
sources += 1;
}
sources == 1 pub fn warehouse(mut self, warehouse: Option<Ident>) -> Self {
self.warehouse = warehouse;
self
}
pub fn refresh_mode(mut self, refresh_mode: Option<RefreshModeKind>) -> Self {
self.refresh_mode = refresh_mode;
self
}
pub fn initialize(mut self, initialize: Option<InitializeKind>) -> Self {
self.initialize = initialize;
self
}
pub fn require_user(mut self, require_user: bool) -> Self {
self.require_user = require_user;
self
} }
pub fn build(self) -> Statement { pub fn build(self) -> Statement {
@ -412,6 +441,7 @@ impl CreateTableBuilder {
transient: self.transient, transient: self.transient,
volatile: self.volatile, volatile: self.volatile,
iceberg: self.iceberg, iceberg: self.iceberg,
dynamic: self.dynamic,
name: self.name, name: self.name,
columns: self.columns, columns: self.columns,
constraints: self.constraints, constraints: self.constraints,
@ -423,6 +453,7 @@ impl CreateTableBuilder {
without_rowid: self.without_rowid, without_rowid: self.without_rowid,
like: self.like, like: self.like,
clone: self.clone, clone: self.clone,
version: self.version,
comment: self.comment, comment: self.comment,
on_commit: self.on_commit, on_commit: self.on_commit,
on_cluster: self.on_cluster, on_cluster: self.on_cluster,
@ -448,6 +479,11 @@ impl CreateTableBuilder {
catalog_sync: self.catalog_sync, catalog_sync: self.catalog_sync,
storage_serialization_policy: self.storage_serialization_policy, storage_serialization_policy: self.storage_serialization_policy,
table_options: self.table_options, table_options: self.table_options,
target_lag: self.target_lag,
warehouse: self.warehouse,
refresh_mode: self.refresh_mode,
initialize: self.initialize,
require_user: self.require_user,
}) })
} }
} }
@ -468,6 +504,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
transient, transient,
volatile, volatile,
iceberg, iceberg,
dynamic,
name, name,
columns, columns,
constraints, constraints,
@ -479,6 +516,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
without_rowid, without_rowid,
like, like,
clone, clone,
version,
comment, comment,
on_commit, on_commit,
on_cluster, on_cluster,
@ -504,6 +542,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
catalog_sync, catalog_sync,
storage_serialization_policy, storage_serialization_policy,
table_options, table_options,
target_lag,
warehouse,
refresh_mode,
initialize,
require_user,
}) => Ok(Self { }) => Ok(Self {
or_replace, or_replace,
temporary, temporary,
@ -511,6 +554,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
global, global,
if_not_exists, if_not_exists,
transient, transient,
dynamic,
name, name,
columns, columns,
constraints, constraints,
@ -522,6 +566,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
without_rowid, without_rowid,
like, like,
clone, clone,
version,
comment, comment,
on_commit, on_commit,
on_cluster, on_cluster,
@ -549,6 +594,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
catalog_sync, catalog_sync,
storage_serialization_policy, storage_serialization_policy,
table_options, table_options,
target_lag,
warehouse,
refresh_mode,
initialize,
require_user,
}), }),
_ => Err(ParserError::ParserError(format!( _ => Err(ParserError::ParserError(format!(
"Expected create table statement, but received: {stmt}" "Expected create table statement, but received: {stmt}"

View file

@ -10555,6 +10555,48 @@ impl fmt::Display for CreateTableLike {
} }
} }
/// Specifies the refresh mode for the dynamic table.
///
/// [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table)
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum RefreshModeKind {
Auto,
Full,
Incremental,
}
impl fmt::Display for RefreshModeKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
RefreshModeKind::Auto => write!(f, "AUTO"),
RefreshModeKind::Full => write!(f, "FULL"),
RefreshModeKind::Incremental => write!(f, "INCREMENTAL"),
}
}
}
/// Specifies the behavior of the initial refresh of the dynamic table.
///
/// [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table)
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum InitializeKind {
OnCreate,
OnSchedule,
}
impl fmt::Display for InitializeKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
InitializeKind::OnCreate => write!(f, "ON_CREATE"),
InitializeKind::OnSchedule => write!(f, "ON_SCHEDULE"),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::tokenizer::Location; use crate::tokenizer::Location;

View file

@ -1884,7 +1884,7 @@ impl fmt::Display for TableFactor {
write!(f, " WITH ({})", display_comma_separated(with_hints))?; write!(f, " WITH ({})", display_comma_separated(with_hints))?;
} }
if let Some(version) = version { if let Some(version) = version {
write!(f, "{version}")?; write!(f, " {version}")?;
} }
if let Some(TableSampleKind::AfterTableAlias(sample)) = sample { if let Some(TableSampleKind::AfterTableAlias(sample)) = sample {
write!(f, " {sample}")?; write!(f, " {sample}")?;
@ -2179,8 +2179,8 @@ pub enum TableVersion {
impl Display for TableVersion { impl Display for TableVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
TableVersion::ForSystemTimeAsOf(e) => write!(f, " FOR SYSTEM_TIME AS OF {e}")?, TableVersion::ForSystemTimeAsOf(e) => write!(f, "FOR SYSTEM_TIME AS OF {e}")?,
TableVersion::Function(func) => write!(f, " {func}")?, TableVersion::Function(func) => write!(f, "{func}")?,
} }
Ok(()) Ok(())
} }

View file

@ -579,6 +579,7 @@ impl Spanned for CreateTable {
temporary: _, // bool temporary: _, // bool
external: _, // bool external: _, // bool
global: _, // bool global: _, // bool
dynamic: _, // bool
if_not_exists: _, // bool if_not_exists: _, // bool
transient: _, // bool transient: _, // bool
volatile: _, // bool volatile: _, // bool
@ -619,6 +620,12 @@ impl Spanned for CreateTable {
catalog_sync: _, // todo, Snowflake specific catalog_sync: _, // todo, Snowflake specific
storage_serialization_policy: _, storage_serialization_policy: _,
table_options, table_options,
target_lag: _,
warehouse: _,
version: _,
refresh_mode: _,
initialize: _,
require_user: _,
} = self; } = self;
union_spans( union_spans(

View file

@ -29,8 +29,8 @@ use crate::ast::{
CatalogSyncNamespaceMode, ColumnOption, ColumnPolicy, ColumnPolicyProperty, ContactEntry, CatalogSyncNamespaceMode, ColumnOption, ColumnPolicy, ColumnPolicyProperty, ContactEntry,
CopyIntoSnowflakeKind, CreateTableLikeKind, DollarQuotedString, Ident, IdentityParameters, CopyIntoSnowflakeKind, CreateTableLikeKind, DollarQuotedString, Ident, IdentityParameters,
IdentityProperty, IdentityPropertyFormatKind, IdentityPropertyKind, IdentityPropertyOrder, IdentityProperty, IdentityPropertyFormatKind, IdentityPropertyKind, IdentityPropertyOrder,
ObjectName, ObjectNamePart, RowAccessPolicy, ShowObjects, SqlOption, Statement, InitializeKind, ObjectName, ObjectNamePart, RefreshModeKind, RowAccessPolicy, ShowObjects,
StorageSerializationPolicy, TagsColumnOption, WrappedCollection, SqlOption, Statement, StorageSerializationPolicy, TagsColumnOption, WrappedCollection,
}; };
use crate::dialect::{Dialect, Precedence}; use crate::dialect::{Dialect, Precedence};
use crate::keywords::Keyword; use crate::keywords::Keyword;
@ -235,6 +235,8 @@ impl Dialect for SnowflakeDialect {
_ => None, _ => None,
}; };
let dynamic = parser.parse_keyword(Keyword::DYNAMIC);
let mut temporary = false; let mut temporary = false;
let mut volatile = false; let mut volatile = false;
let mut transient = false; let mut transient = false;
@ -259,7 +261,7 @@ impl Dialect for SnowflakeDialect {
return Some(parse_create_stage(or_replace, temporary, parser)); return Some(parse_create_stage(or_replace, temporary, parser));
} else if parser.parse_keyword(Keyword::TABLE) { } else if parser.parse_keyword(Keyword::TABLE) {
return Some(parse_create_table( return Some(parse_create_table(
or_replace, global, temporary, volatile, transient, iceberg, parser, or_replace, global, temporary, volatile, transient, iceberg, dynamic, parser,
)); ));
} else if parser.parse_keyword(Keyword::DATABASE) { } else if parser.parse_keyword(Keyword::DATABASE) {
return Some(parse_create_database(or_replace, transient, parser)); return Some(parse_create_database(or_replace, transient, parser));
@ -614,6 +616,7 @@ fn parse_alter_session(parser: &mut Parser, set: bool) -> Result<Statement, Pars
/// Parse snowflake create table statement. /// Parse snowflake create table statement.
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table> /// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table> /// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
#[allow(clippy::too_many_arguments)]
pub fn parse_create_table( pub fn parse_create_table(
or_replace: bool, or_replace: bool,
global: Option<bool>, global: Option<bool>,
@ -621,6 +624,7 @@ pub fn parse_create_table(
volatile: bool, volatile: bool,
transient: bool, transient: bool,
iceberg: bool, iceberg: bool,
dynamic: bool,
parser: &mut Parser, parser: &mut Parser,
) -> Result<Statement, ParserError> { ) -> Result<Statement, ParserError> {
let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
@ -634,6 +638,7 @@ pub fn parse_create_table(
.volatile(volatile) .volatile(volatile)
.iceberg(iceberg) .iceberg(iceberg)
.global(global) .global(global)
.dynamic(dynamic)
.hive_formats(Some(Default::default())); .hive_formats(Some(Default::default()));
// Snowflake does not enforce order of the parameters in the statement. The parser needs to // Snowflake does not enforce order of the parameters in the statement. The parser needs to
@ -772,6 +777,49 @@ pub fn parse_create_table(
Keyword::IF if parser.parse_keywords(&[Keyword::NOT, Keyword::EXISTS]) => { Keyword::IF if parser.parse_keywords(&[Keyword::NOT, Keyword::EXISTS]) => {
builder = builder.if_not_exists(true); builder = builder.if_not_exists(true);
} }
Keyword::TARGET_LAG => {
parser.expect_token(&Token::Eq)?;
let target_lag = parser.parse_literal_string()?;
builder = builder.target_lag(Some(target_lag));
}
Keyword::WAREHOUSE => {
parser.expect_token(&Token::Eq)?;
let warehouse = parser.parse_identifier()?;
builder = builder.warehouse(Some(warehouse));
}
Keyword::AT | Keyword::BEFORE => {
parser.prev_token();
let version = parser.maybe_parse_table_version()?;
builder = builder.version(version);
}
Keyword::REFRESH_MODE => {
parser.expect_token(&Token::Eq)?;
let refresh_mode = match parser.parse_one_of_keywords(&[
Keyword::AUTO,
Keyword::FULL,
Keyword::INCREMENTAL,
]) {
Some(Keyword::AUTO) => Some(RefreshModeKind::Auto),
Some(Keyword::FULL) => Some(RefreshModeKind::Full),
Some(Keyword::INCREMENTAL) => Some(RefreshModeKind::Incremental),
_ => return parser.expected("AUTO, FULL or INCREMENTAL", next_token),
};
builder = builder.refresh_mode(refresh_mode);
}
Keyword::INITIALIZE => {
parser.expect_token(&Token::Eq)?;
let initialize = match parser
.parse_one_of_keywords(&[Keyword::ON_CREATE, Keyword::ON_SCHEDULE])
{
Some(Keyword::ON_CREATE) => Some(InitializeKind::OnCreate),
Some(Keyword::ON_SCHEDULE) => Some(InitializeKind::OnSchedule),
_ => return parser.expected("ON_CREATE or ON_SCHEDULE", next_token),
};
builder = builder.initialize(initialize);
}
Keyword::REQUIRE if parser.parse_keyword(Keyword::USER) => {
builder = builder.require_user(true);
}
_ => { _ => {
return parser.expected("end of statement", next_token); return parser.expected("end of statement", next_token);
} }
@ -782,21 +830,9 @@ pub fn parse_create_table(
builder = builder.columns(columns).constraints(constraints); builder = builder.columns(columns).constraints(constraints);
} }
Token::EOF => { Token::EOF => {
if !builder.validate_schema_info() {
return Err(ParserError::ParserError(
"unexpected end of input".to_string(),
));
}
break; break;
} }
Token::SemiColon => { Token::SemiColon => {
if !builder.validate_schema_info() {
return Err(ParserError::ParserError(
"unexpected end of input".to_string(),
));
}
parser.prev_token(); parser.prev_token();
break; break;
} }

View file

@ -300,6 +300,7 @@ define_keywords!(
DOMAIN, DOMAIN,
DOUBLE, DOUBLE,
DOW, DOW,
DOWNSTREAM,
DOY, DOY,
DROP, DROP,
DRY, DRY,
@ -447,10 +448,12 @@ define_keywords!(
INCLUDE_NULL_VALUES, INCLUDE_NULL_VALUES,
INCLUDING, INCLUDING,
INCREMENT, INCREMENT,
INCREMENTAL,
INDEX, INDEX,
INDICATOR, INDICATOR,
INHERIT, INHERIT,
INHERITS, INHERITS,
INITIALIZE,
INITIALLY, INITIALLY,
INNER, INNER,
INOUT, INOUT,
@ -643,6 +646,8 @@ define_keywords!(
ON, ON,
ONE, ONE,
ONLY, ONLY,
ON_CREATE,
ON_SCHEDULE,
OPEN, OPEN,
OPENJSON, OPENJSON,
OPERATE, OPERATE,
@ -744,6 +749,7 @@ define_keywords!(
REF, REF,
REFERENCES, REFERENCES,
REFERENCING, REFERENCING,
REFRESH_MODE,
REGCLASS, REGCLASS,
REGEXP, REGEXP,
REGR_AVGX, REGR_AVGX,
@ -770,6 +776,7 @@ define_keywords!(
REPLICA, REPLICA,
REPLICATE, REPLICATE,
REPLICATION, REPLICATION,
REQUIRE,
RESET, RESET,
RESOLVE, RESOLVE,
RESOURCE, RESOURCE,
@ -907,6 +914,7 @@ define_keywords!(
TABLESPACE, TABLESPACE,
TAG, TAG,
TARGET, TARGET,
TARGET_LAG,
TASK, TASK,
TBLPROPERTIES, TBLPROPERTIES,
TEMP, TEMP,

View file

@ -700,6 +700,7 @@ fn test_duckdb_union_datatype() {
transient: Default::default(), transient: Default::default(),
volatile: Default::default(), volatile: Default::default(),
iceberg: Default::default(), iceberg: Default::default(),
dynamic: Default::default(),
name: ObjectName::from(vec!["tbl1".into()]), name: ObjectName::from(vec!["tbl1".into()]),
columns: vec![ columns: vec![
ColumnDef { ColumnDef {
@ -774,7 +775,13 @@ fn test_duckdb_union_datatype() {
catalog: Default::default(), catalog: Default::default(),
catalog_sync: Default::default(), catalog_sync: Default::default(),
storage_serialization_policy: Default::default(), storage_serialization_policy: Default::default(),
table_options: CreateTableOptions::None table_options: CreateTableOptions::None,
target_lag: None,
warehouse: None,
version: None,
refresh_mode: None,
initialize: None,
require_user: Default::default(),
}), }),
stmt stmt
); );

View file

@ -1848,6 +1848,7 @@ fn parse_create_table_with_valid_options() {
temporary: false, temporary: false,
external: false, external: false,
global: None, global: None,
dynamic: false,
if_not_exists: false, if_not_exists: false,
transient: false, transient: false,
volatile: false, volatile: false,
@ -1924,7 +1925,13 @@ fn parse_create_table_with_valid_options() {
catalog: None, catalog: None,
catalog_sync: None, catalog_sync: None,
storage_serialization_policy: None, storage_serialization_policy: None,
table_options: CreateTableOptions::With(with_options) table_options: CreateTableOptions::With(with_options),
target_lag: None,
warehouse: None,
version: None,
refresh_mode: None,
initialize: None,
require_user: false,
}) })
); );
} }
@ -2031,6 +2038,7 @@ fn parse_create_table_with_identity_column() {
temporary: false, temporary: false,
external: false, external: false,
global: None, global: None,
dynamic: false,
if_not_exists: false, if_not_exists: false,
transient: false, transient: false,
volatile: false, volatile: false,
@ -2088,7 +2096,13 @@ fn parse_create_table_with_identity_column() {
catalog: None, catalog: None,
catalog_sync: None, catalog_sync: None,
storage_serialization_policy: None, storage_serialization_policy: None,
table_options: CreateTableOptions::None table_options: CreateTableOptions::None,
target_lag: None,
warehouse: None,
version: None,
refresh_mode: None,
initialize: None,
require_user: false,
}), }),
); );
} }

View file

@ -5932,6 +5932,7 @@ fn parse_trigger_related_functions() {
temporary: false, temporary: false,
external: false, external: false,
global: None, global: None,
dynamic: false,
if_not_exists: false, if_not_exists: false,
transient: false, transient: false,
volatile: false, volatile: false,
@ -5997,7 +5998,13 @@ fn parse_trigger_related_functions() {
catalog: None, catalog: None,
catalog_sync: None, catalog_sync: None,
storage_serialization_policy: None, storage_serialization_policy: None,
table_options: CreateTableOptions::None table_options: CreateTableOptions::None,
target_lag: None,
warehouse: None,
version: None,
refresh_mode: None,
initialize: None,
require_user: false,
} }
); );

View file

@ -528,23 +528,6 @@ fn test_snowflake_create_table_comment() {
} }
} }
#[test]
fn test_snowflake_create_table_incomplete_statement() {
assert_eq!(
snowflake().parse_sql_statements("CREATE TABLE my_table"),
Err(ParserError::ParserError(
"unexpected end of input".to_string()
))
);
assert_eq!(
snowflake().parse_sql_statements("CREATE TABLE my_table; (c int)"),
Err(ParserError::ParserError(
"unexpected end of input".to_string()
))
);
}
#[test] #[test]
fn test_snowflake_single_line_tokenize() { fn test_snowflake_single_line_tokenize() {
let sql = "CREATE TABLE# this is a comment \ntable_1"; let sql = "CREATE TABLE# this is a comment \ntable_1";
@ -923,8 +906,8 @@ fn test_snowflake_create_table_with_several_column_options() {
#[test] #[test]
fn test_snowflake_create_iceberg_table_all_options() { fn test_snowflake_create_iceberg_table_all_options() {
match snowflake().verified_stmt("CREATE ICEBERG TABLE my_table (a INT, b INT) \ match snowflake().verified_stmt("CREATE ICEBERG TABLE my_table (a INT, b INT) \
CLUSTER BY (a, b) EXTERNAL_VOLUME = 'volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'relative/path' CATALOG_SYNC = 'OPEN_CATALOG' \ CLUSTER BY (a, b) EXTERNAL_VOLUME='volume' CATALOG='SNOWFLAKE' BASE_LOCATION='relative/path' CATALOG_SYNC='OPEN_CATALOG' \
STORAGE_SERIALIZATION_POLICY = COMPATIBLE COPY GRANTS CHANGE_TRACKING=TRUE DATA_RETENTION_TIME_IN_DAYS=5 MAX_DATA_EXTENSION_TIME_IN_DAYS=10 \ STORAGE_SERIALIZATION_POLICY=COMPATIBLE COPY GRANTS CHANGE_TRACKING=TRUE DATA_RETENTION_TIME_IN_DAYS=5 MAX_DATA_EXTENSION_TIME_IN_DAYS=10 \
WITH AGGREGATION POLICY policy_name WITH ROW ACCESS POLICY policy_name ON (a) WITH TAG (A='TAG A', B='TAG B')") { WITH AGGREGATION POLICY policy_name WITH ROW ACCESS POLICY policy_name ON (a) WITH TAG (A='TAG A', B='TAG B')") {
Statement::CreateTable(CreateTable { Statement::CreateTable(CreateTable {
name, cluster_by, base_location, name, cluster_by, base_location,
@ -972,7 +955,7 @@ fn test_snowflake_create_iceberg_table_all_options() {
#[test] #[test]
fn test_snowflake_create_iceberg_table() { fn test_snowflake_create_iceberg_table() {
match snowflake() match snowflake()
.verified_stmt("CREATE ICEBERG TABLE my_table (a INT) BASE_LOCATION = 'relative_path'") .verified_stmt("CREATE ICEBERG TABLE my_table (a INT) BASE_LOCATION='relative_path'")
{ {
Statement::CreateTable(CreateTable { Statement::CreateTable(CreateTable {
name, name,
@ -1019,27 +1002,6 @@ fn test_snowflake_create_table_trailing_options() {
.unwrap(); .unwrap();
} }
#[test]
fn test_snowflake_create_table_valid_schema_info() {
// Validate there's exactly one source of information on the schema of the new table
assert_eq!(
snowflake()
.parse_sql_statements("CREATE TABLE dst")
.is_err(),
true
);
assert_eq!(
snowflake().parse_sql_statements("CREATE OR REPLACE TEMP TABLE dst LIKE src AS (SELECT * FROM CUSTOMERS) ON COMMIT PRESERVE ROWS").is_err(),
true
);
assert_eq!(
snowflake()
.parse_sql_statements("CREATE OR REPLACE TEMP TABLE dst CLONE customers LIKE customer2")
.is_err(),
true
);
}
#[test] #[test]
fn parse_sf_create_or_replace_view_with_comment_missing_equal() { fn parse_sf_create_or_replace_view_with_comment_missing_equal() {
assert!(snowflake_and_generic() assert!(snowflake_and_generic()
@ -1104,6 +1066,79 @@ fn parse_sf_create_table_or_view_with_dollar_quoted_comment() {
); );
} }
#[test]
fn parse_create_dynamic_table() {
snowflake().verified_stmt(r#"CREATE OR REPLACE DYNAMIC TABLE my_dynamic_table TARGET_LAG='20 minutes' WAREHOUSE=mywh AS SELECT product_id, product_name FROM staging_table"#);
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC ICEBERG TABLE my_dynamic_table (date TIMESTAMP_NTZ, id NUMBER, content STRING)",
" EXTERNAL_VOLUME='my_external_volume'",
" CATALOG='SNOWFLAKE'",
" BASE_LOCATION='my_iceberg_table'",
" TARGET_LAG='20 minutes'",
" WAREHOUSE=mywh",
" AS SELECT product_id, product_name FROM staging_table"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_dynamic_table (date TIMESTAMP_NTZ, id NUMBER, content VARIANT)",
" CLUSTER BY (date, id)",
" TARGET_LAG='20 minutes'",
" WAREHOUSE=mywh",
" AS SELECT product_id, product_name FROM staging_table"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_cloned_dynamic_table",
" CLONE my_dynamic_table",
" AT(TIMESTAMP => TO_TIMESTAMP_TZ('04/05/2013 01:02:03', 'mm/dd/yyyy hh24:mi:ss'))"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_cloned_dynamic_table",
" CLONE my_dynamic_table",
" BEFORE(OFFSET => TO_TIMESTAMP_TZ('04/05/2013 01:02:03', 'mm/dd/yyyy hh24:mi:ss'))"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_dynamic_table",
" TARGET_LAG='DOWNSTREAM'",
" WAREHOUSE=mywh",
" INITIALIZE=ON_SCHEDULE",
" REQUIRE USER",
" AS SELECT product_id, product_name FROM staging_table"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_dynamic_table",
" TARGET_LAG='DOWNSTREAM'",
" WAREHOUSE=mywh",
" REFRESH_MODE=AUTO",
" INITIALIZE=ON_SCHEDULE",
" REQUIRE USER",
" AS SELECT product_id, product_name FROM staging_table"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_dynamic_table",
" TARGET_LAG='DOWNSTREAM'",
" WAREHOUSE=mywh",
" REFRESH_MODE=FULL",
" INITIALIZE=ON_SCHEDULE",
" REQUIRE USER",
" AS SELECT product_id, product_name FROM staging_table"
));
snowflake().verified_stmt(concat!(
"CREATE DYNAMIC TABLE my_dynamic_table",
" TARGET_LAG='DOWNSTREAM'",
" WAREHOUSE=mywh",
" REFRESH_MODE=INCREMENTAL",
" INITIALIZE=ON_SCHEDULE",
" REQUIRE USER",
" AS SELECT product_id, product_name FROM staging_table"
));
}
#[test] #[test]
fn test_sf_derived_table_in_parenthesis() { fn test_sf_derived_table_in_parenthesis() {
// Nesting a subquery in an extra set of parentheses is non-standard, // Nesting a subquery in an extra set of parentheses is non-standard,
@ -4516,9 +4551,6 @@ fn test_snowflake_identifier_function() {
.is_err(), .is_err(),
true true
); );
snowflake().verified_stmt("GRANT ROLE IDENTIFIER('AAA') TO USER IDENTIFIER('AAA')");
snowflake().verified_stmt("REVOKE ROLE IDENTIFIER('AAA') FROM USER IDENTIFIER('AAA')");
} }
#[test] #[test]