Add support for Create Iceberg Table statement for Snowflake parser (#1664)

This commit is contained in:
Denys Tsomenko 2025-01-20 22:39:44 +02:00 committed by GitHub
parent 183274e274
commit c7c0de6551
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 296 additions and 5 deletions

View file

@ -36,7 +36,8 @@ use super::{
CommentDef, Expr, FileFormat, FromTable, HiveDistributionStyle, HiveFormat, HiveIOFormat,
HiveRowFormat, Ident, InsertAliases, MysqlInsertPriority, ObjectName, OnCommit, OnInsert,
OneOrManyWithParens, OrderByExpr, Query, RowAccessPolicy, SelectItem, Setting, SqlOption,
SqliteOnConflict, TableEngine, TableObject, TableWithJoins, Tag, WrappedCollection,
SqliteOnConflict, StorageSerializationPolicy, TableEngine, TableObject, TableWithJoins, Tag,
WrappedCollection,
};
/// CREATE INDEX statement.
@ -117,6 +118,7 @@ pub struct CreateTable {
pub if_not_exists: bool,
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
/// Table name
#[cfg_attr(feature = "visitor", visit(with = "visit_relation"))]
pub name: ObjectName,
@ -192,6 +194,21 @@ pub struct CreateTable {
/// Snowflake "WITH TAG" clause
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
pub with_tags: Option<Vec<Tag>>,
/// Snowflake "EXTERNAL_VOLUME" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub external_volume: Option<String>,
/// Snowflake "BASE_LOCATION" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub base_location: Option<String>,
/// Snowflake "CATALOG" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub catalog: Option<String>,
/// Snowflake "CATALOG_SYNC" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub catalog_sync: Option<String>,
/// Snowflake "STORAGE_SERIALIZATION_POLICY" clause for Iceberg tables
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
}
impl Display for CreateTable {
@ -205,7 +222,7 @@ impl Display for CreateTable {
// `CREATE TABLE t (a INT) AS SELECT a from t2`
write!(
f,
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}TABLE {if_not_exists}{name}",
"CREATE {or_replace}{external}{global}{temporary}{transient}{volatile}{iceberg}TABLE {if_not_exists}{name}",
or_replace = if self.or_replace { "OR REPLACE " } else { "" },
external = if self.external { "EXTERNAL " } else { "" },
global = self.global
@ -221,6 +238,8 @@ impl Display for CreateTable {
temporary = if self.temporary { "TEMPORARY " } else { "" },
transient = if self.transient { "TRANSIENT " } else { "" },
volatile = if self.volatile { "VOLATILE " } else { "" },
// Only for Snowflake
iceberg = if self.iceberg { "ICEBERG " } else { "" },
name = self.name,
)?;
if let Some(on_cluster) = &self.on_cluster {
@ -382,6 +401,31 @@ impl Display for CreateTable {
)?;
}
if let Some(external_volume) = self.external_volume.as_ref() {
write!(f, " EXTERNAL_VOLUME = '{external_volume}'")?;
}
if let Some(catalog) = self.catalog.as_ref() {
write!(f, " CATALOG = '{catalog}'")?;
}
if self.iceberg {
if let Some(base_location) = self.base_location.as_ref() {
write!(f, " BASE_LOCATION = '{base_location}'")?;
}
}
if let Some(catalog_sync) = self.catalog_sync.as_ref() {
write!(f, " CATALOG_SYNC = '{catalog_sync}'")?;
}
if let Some(storage_serialization_policy) = self.storage_serialization_policy.as_ref() {
write!(
f,
" STORAGE_SERIALIZATION_POLICY = {storage_serialization_policy}"
)?;
}
if self.copy_grants {
write!(f, " COPY GRANTS")?;
}

View file

@ -28,7 +28,7 @@ use super::super::dml::CreateTable;
use crate::ast::{
ClusteredBy, ColumnDef, CommentDef, Expr, FileFormat, HiveDistributionStyle, HiveFormat, Ident,
ObjectName, OnCommit, OneOrManyWithParens, Query, RowAccessPolicy, SqlOption, Statement,
TableConstraint, TableEngine, Tag, WrappedCollection,
StorageSerializationPolicy, TableConstraint, TableEngine, Tag, WrappedCollection,
};
use crate::parser::ParserError;
@ -71,6 +71,7 @@ pub struct CreateTableBuilder {
pub if_not_exists: bool,
pub transient: bool,
pub volatile: bool,
pub iceberg: bool,
pub name: ObjectName,
pub columns: Vec<ColumnDef>,
pub constraints: Vec<TableConstraint>,
@ -107,6 +108,11 @@ pub struct CreateTableBuilder {
pub with_aggregation_policy: Option<ObjectName>,
pub with_row_access_policy: Option<RowAccessPolicy>,
pub with_tags: Option<Vec<Tag>>,
pub base_location: Option<String>,
pub external_volume: Option<String>,
pub catalog: Option<String>,
pub catalog_sync: Option<String>,
pub storage_serialization_policy: Option<StorageSerializationPolicy>,
}
impl CreateTableBuilder {
@ -119,6 +125,7 @@ impl CreateTableBuilder {
if_not_exists: false,
transient: false,
volatile: false,
iceberg: false,
name,
columns: vec![],
constraints: vec![],
@ -155,6 +162,11 @@ impl CreateTableBuilder {
with_aggregation_policy: None,
with_row_access_policy: None,
with_tags: None,
base_location: None,
external_volume: None,
catalog: None,
catalog_sync: None,
storage_serialization_policy: None,
}
}
pub fn or_replace(mut self, or_replace: bool) -> Self {
@ -192,6 +204,11 @@ impl CreateTableBuilder {
self
}
pub fn iceberg(mut self, iceberg: bool) -> Self {
self.iceberg = iceberg;
self
}
pub fn columns(mut self, columns: Vec<ColumnDef>) -> Self {
self.columns = columns;
self
@ -371,6 +388,34 @@ impl CreateTableBuilder {
self
}
pub fn base_location(mut self, base_location: Option<String>) -> Self {
self.base_location = base_location;
self
}
pub fn external_volume(mut self, external_volume: Option<String>) -> Self {
self.external_volume = external_volume;
self
}
pub fn catalog(mut self, catalog: Option<String>) -> Self {
self.catalog = catalog;
self
}
pub fn catalog_sync(mut self, catalog_sync: Option<String>) -> Self {
self.catalog_sync = catalog_sync;
self
}
pub fn storage_serialization_policy(
mut self,
storage_serialization_policy: Option<StorageSerializationPolicy>,
) -> Self {
self.storage_serialization_policy = storage_serialization_policy;
self
}
pub fn build(self) -> Statement {
Statement::CreateTable(CreateTable {
or_replace: self.or_replace,
@ -380,6 +425,7 @@ impl CreateTableBuilder {
if_not_exists: self.if_not_exists,
transient: self.transient,
volatile: self.volatile,
iceberg: self.iceberg,
name: self.name,
columns: self.columns,
constraints: self.constraints,
@ -416,6 +462,11 @@ impl CreateTableBuilder {
with_aggregation_policy: self.with_aggregation_policy,
with_row_access_policy: self.with_row_access_policy,
with_tags: self.with_tags,
base_location: self.base_location,
external_volume: self.external_volume,
catalog: self.catalog,
catalog_sync: self.catalog_sync,
storage_serialization_policy: self.storage_serialization_policy,
})
}
}
@ -435,6 +486,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
if_not_exists,
transient,
volatile,
iceberg,
name,
columns,
constraints,
@ -471,6 +523,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
with_aggregation_policy,
with_row_access_policy,
with_tags,
base_location,
external_volume,
catalog,
catalog_sync,
storage_serialization_policy,
}) => Ok(Self {
or_replace,
temporary,
@ -505,6 +562,7 @@ impl TryFrom<Statement> for CreateTableBuilder {
clustered_by,
options,
strict,
iceberg,
copy_grants,
enable_schema_evolution,
change_tracking,
@ -515,6 +573,11 @@ impl TryFrom<Statement> for CreateTableBuilder {
with_row_access_policy,
with_tags,
volatile,
base_location,
external_volume,
catalog,
catalog_sync,
storage_serialization_policy,
}),
_ => Err(ParserError::ParserError(format!(
"Expected create table statement, but received: {stmt}"

View file

@ -8396,6 +8396,29 @@ impl fmt::Display for SessionParamValue {
}
}
/// Snowflake StorageSerializationPolicy for Iceberg Tables
/// ```sql
/// [ STORAGE_SERIALIZATION_POLICY = { COMPATIBLE | OPTIMIZED } ]
/// ```
///
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum StorageSerializationPolicy {
Compatible,
Optimized,
}
impl Display for StorageSerializationPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StorageSerializationPolicy::Compatible => write!(f, "COMPATIBLE"),
StorageSerializationPolicy::Optimized => write!(f, "OPTIMIZED"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -532,6 +532,7 @@ impl Spanned for CreateTable {
if_not_exists: _, // bool
transient: _, // bool
volatile: _, // bool
iceberg: _, // bool, Snowflake specific
name,
columns,
constraints,
@ -568,6 +569,11 @@ impl Spanned for CreateTable {
with_aggregation_policy: _, // todo, Snowflake specific
with_row_access_policy: _, // todo, Snowflake specific
with_tags: _, // todo, Snowflake specific
external_volume: _, // todo, Snowflake specific
base_location: _, // todo, Snowflake specific
catalog: _, // todo, Snowflake specific
catalog_sync: _, // todo, Snowflake specific
storage_serialization_policy: _, // todo, Snowflake specific
} = self;
union_spans(

View file

@ -37,6 +37,7 @@ use alloc::string::String;
use alloc::vec::Vec;
#[cfg(not(feature = "std"))]
use alloc::{format, vec};
use sqlparser::ast::StorageSerializationPolicy;
use super::keywords::RESERVED_FOR_IDENTIFIER;
@ -130,16 +131,19 @@ impl Dialect for SnowflakeDialect {
let mut temporary = false;
let mut volatile = false;
let mut transient = false;
let mut iceberg = false;
match parser.parse_one_of_keywords(&[
Keyword::TEMP,
Keyword::TEMPORARY,
Keyword::VOLATILE,
Keyword::TRANSIENT,
Keyword::ICEBERG,
]) {
Some(Keyword::TEMP | Keyword::TEMPORARY) => temporary = true,
Some(Keyword::VOLATILE) => volatile = true,
Some(Keyword::TRANSIENT) => transient = true,
Some(Keyword::ICEBERG) => iceberg = true,
_ => {}
}
@ -148,7 +152,7 @@ impl Dialect for SnowflakeDialect {
return Some(parse_create_stage(or_replace, temporary, parser));
} else if parser.parse_keyword(Keyword::TABLE) {
return Some(parse_create_table(
or_replace, global, temporary, volatile, transient, parser,
or_replace, global, temporary, volatile, transient, iceberg, parser,
));
} else {
// need to go back with the cursor
@ -325,12 +329,14 @@ fn parse_file_staging_command(kw: Keyword, parser: &mut Parser) -> Result<Statem
/// Parse snowflake create table statement.
/// <https://docs.snowflake.com/en/sql-reference/sql/create-table>
/// <https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table>
pub fn parse_create_table(
or_replace: bool,
global: Option<bool>,
temporary: bool,
volatile: bool,
transient: bool,
iceberg: bool,
parser: &mut Parser,
) -> Result<Statement, ParserError> {
let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
@ -342,6 +348,7 @@ pub fn parse_create_table(
.temporary(temporary)
.transient(transient)
.volatile(volatile)
.iceberg(iceberg)
.global(global)
.hive_formats(Some(Default::default()));
@ -468,6 +475,28 @@ pub fn parse_create_table(
let on_commit = Some(parser.parse_create_table_on_commit()?);
builder = builder.on_commit(on_commit);
}
Keyword::EXTERNAL_VOLUME => {
parser.expect_token(&Token::Eq)?;
builder.external_volume = Some(parser.parse_literal_string()?);
}
Keyword::CATALOG => {
parser.expect_token(&Token::Eq)?;
builder.catalog = Some(parser.parse_literal_string()?);
}
Keyword::BASE_LOCATION => {
parser.expect_token(&Token::Eq)?;
builder.base_location = Some(parser.parse_literal_string()?);
}
Keyword::CATALOG_SYNC => {
parser.expect_token(&Token::Eq)?;
builder.catalog_sync = Some(parser.parse_literal_string()?);
}
Keyword::STORAGE_SERIALIZATION_POLICY => {
parser.expect_token(&Token::Eq)?;
builder.storage_serialization_policy =
Some(parse_storage_serialization_policy(parser)?);
}
_ => {
return parser.expected("end of statement", next_token);
}
@ -502,9 +531,29 @@ pub fn parse_create_table(
}
}
if iceberg && builder.base_location.is_none() {
return Err(ParserError::ParserError(
"BASE_LOCATION is required for ICEBERG tables".to_string(),
));
}
Ok(builder.build())
}
pub fn parse_storage_serialization_policy(
parser: &mut Parser,
) -> Result<StorageSerializationPolicy, ParserError> {
let next_token = parser.next_token();
match &next_token.token {
Token::Word(w) => match w.keyword {
Keyword::COMPATIBLE => Ok(StorageSerializationPolicy::Compatible),
Keyword::OPTIMIZED => Ok(StorageSerializationPolicy::Optimized),
_ => parser.expected("storage_serialization_policy", next_token),
},
_ => parser.expected("storage_serialization_policy", next_token),
}
}
pub fn parse_create_stage(
or_replace: bool,
temporary: bool,

View file

@ -121,6 +121,7 @@ define_keywords!(
AVRO,
BACKWARD,
BASE64,
BASE_LOCATION,
BEFORE,
BEGIN,
BEGIN_FRAME,
@ -158,6 +159,7 @@ define_keywords!(
CASES,
CAST,
CATALOG,
CATALOG_SYNC,
CATCH,
CEIL,
CEILING,
@ -191,6 +193,7 @@ define_keywords!(
COMMENT,
COMMIT,
COMMITTED,
COMPATIBLE,
COMPRESSION,
COMPUTE,
CONCURRENTLY,
@ -329,6 +332,7 @@ define_keywords!(
EXTENDED,
EXTENSION,
EXTERNAL,
EXTERNAL_VOLUME,
EXTRACT,
FAIL,
FAILOVER,
@ -397,6 +401,7 @@ define_keywords!(
HOSTS,
HOUR,
HOURS,
ICEBERG,
ID,
IDENTITY,
IDENTITY_INSERT,
@ -599,6 +604,7 @@ define_keywords!(
OPERATOR,
OPTIMIZATION,
OPTIMIZE,
OPTIMIZED,
OPTIMIZER_COSTS,
OPTION,
OPTIONS,
@ -806,6 +812,7 @@ define_keywords!(
STDOUT,
STEP,
STORAGE_INTEGRATION,
STORAGE_SERIALIZATION_POLICY,
STORED,
STRICT,
STRING,

View file

@ -660,6 +660,7 @@ fn test_duckdb_union_datatype() {
if_not_exists: Default::default(),
transient: Default::default(),
volatile: Default::default(),
iceberg: Default::default(),
name: ObjectName(vec!["tbl1".into()]),
columns: vec![
ColumnDef {
@ -737,7 +738,12 @@ fn test_duckdb_union_datatype() {
default_ddl_collation: Default::default(),
with_aggregation_policy: Default::default(),
with_row_access_policy: Default::default(),
with_tags: Default::default()
with_tags: Default::default(),
base_location: Default::default(),
external_volume: Default::default(),
catalog: Default::default(),
catalog_sync: Default::default(),
storage_serialization_policy: Default::default(),
}),
stmt
);

View file

@ -1539,6 +1539,7 @@ fn parse_create_table_with_valid_options() {
clustered_by: None,
options: None,
strict: false,
iceberg: false,
copy_grants: false,
enable_schema_evolution: None,
change_tracking: None,
@ -1548,6 +1549,11 @@ fn parse_create_table_with_valid_options() {
with_aggregation_policy: None,
with_row_access_policy: None,
with_tags: None,
base_location: None,
external_volume: None,
catalog: None,
catalog_sync: None,
storage_serialization_policy: None,
})
);
}
@ -1641,6 +1647,7 @@ fn parse_create_table_with_identity_column() {
if_not_exists: false,
transient: false,
volatile: false,
iceberg: false,
name: ObjectName(vec![Ident {
value: "mytable".to_string(),
quote_style: None,
@ -1695,6 +1702,11 @@ fn parse_create_table_with_identity_column() {
with_aggregation_policy: None,
with_row_access_policy: None,
with_tags: None,
base_location: None,
external_volume: None,
catalog: None,
catalog_sync: None,
storage_serialization_policy: None,
}),
);
}

View file

@ -5043,6 +5043,7 @@ fn parse_trigger_related_functions() {
if_not_exists: false,
transient: false,
volatile: false,
iceberg: false,
name: ObjectName(vec![Ident::new("emp")]),
columns: vec![
ColumnDef {
@ -5109,6 +5110,11 @@ fn parse_trigger_related_functions() {
with_aggregation_policy: None,
with_row_access_policy: None,
with_tags: None,
base_location: None,
external_volume: None,
catalog: None,
catalog_sync: None,
storage_serialization_policy: None,
}
);

View file

@ -849,6 +849,81 @@ fn test_snowflake_create_table_with_several_column_options() {
}
}
#[test]
fn test_snowflake_create_iceberg_table_all_options() {
match snowflake().verified_stmt("CREATE ICEBERG TABLE my_table (a INT, b INT) \
CLUSTER BY (a, b) EXTERNAL_VOLUME = 'volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'relative/path' CATALOG_SYNC = 'OPEN_CATALOG' \
STORAGE_SERIALIZATION_POLICY = COMPATIBLE COPY GRANTS CHANGE_TRACKING=TRUE DATA_RETENTION_TIME_IN_DAYS=5 MAX_DATA_EXTENSION_TIME_IN_DAYS=10 \
WITH AGGREGATION POLICY policy_name WITH ROW ACCESS POLICY policy_name ON (a) WITH TAG (A='TAG A', B='TAG B')") {
Statement::CreateTable(CreateTable {
name, cluster_by, base_location,
external_volume, catalog, catalog_sync,
storage_serialization_policy, change_tracking,
copy_grants, data_retention_time_in_days,
max_data_extension_time_in_days, with_aggregation_policy,
with_row_access_policy, with_tags, ..
}) => {
assert_eq!("my_table", name.to_string());
assert_eq!(
Some(WrappedCollection::Parentheses(vec![
Ident::new("a"),
Ident::new("b"),
])),
cluster_by
);
assert_eq!("relative/path", base_location.unwrap());
assert_eq!("volume", external_volume.unwrap());
assert_eq!("SNOWFLAKE", catalog.unwrap());
assert_eq!("OPEN_CATALOG", catalog_sync.unwrap());
assert_eq!(StorageSerializationPolicy::Compatible, storage_serialization_policy.unwrap());
assert!(change_tracking.unwrap());
assert!(copy_grants);
assert_eq!(Some(5), data_retention_time_in_days);
assert_eq!(Some(10), max_data_extension_time_in_days);
assert_eq!(
Some("WITH ROW ACCESS POLICY policy_name ON (a)".to_string()),
with_row_access_policy.map(|policy| policy.to_string())
);
assert_eq!(
Some("policy_name".to_string()),
with_aggregation_policy.map(|name| name.to_string())
);
assert_eq!(Some(vec![
Tag::new("A".into(), "TAG A".into()),
Tag::new("B".into(), "TAG B".into()),
]), with_tags);
}
_ => unreachable!(),
}
}
#[test]
fn test_snowflake_create_iceberg_table() {
match snowflake()
.verified_stmt("CREATE ICEBERG TABLE my_table (a INT) BASE_LOCATION = 'relative_path'")
{
Statement::CreateTable(CreateTable {
name,
base_location,
..
}) => {
assert_eq!("my_table", name.to_string());
assert_eq!("relative_path", base_location.unwrap());
}
_ => unreachable!(),
}
}
#[test]
fn test_snowflake_create_iceberg_table_without_location() {
let res = snowflake().parse_sql_statements("CREATE ICEBERG TABLE my_table (a INT)");
assert_eq!(
ParserError::ParserError("BASE_LOCATION is required for ICEBERG tables".to_string()),
res.unwrap_err()
);
}
#[test]
fn parse_sf_create_or_replace_view_with_comment_missing_equal() {
assert!(snowflake_and_generic()