support CREATE/DROP STAGE for Snowflake (#833)

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
This commit is contained in:
pawel.leszczynski 2023-03-26 13:31:37 +02:00 committed by GitHub
parent a1b7341b87
commit 79c7ac73df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 628 additions and 1 deletions

View file

@ -1 +1,2 @@
pub mod stmt_create_table;
pub mod stmt_data_loading;

View file

@ -0,0 +1,123 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! AST types specific to loading and unloading syntax, like one available in Snowflake which
//! contains: STAGE ddl operations, PUT upload or COPY INTO
//! See [this page](https://docs.snowflake.com/en/sql-reference/commands-data-loading) for more details.
#[cfg(not(feature = "std"))]
use alloc::string::String;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use core::fmt;
use core::fmt::Formatter;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "visitor")]
use sqlparser_derive::{Visit, VisitMut};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct StageParamsObject {
pub url: Option<String>,
pub encryption: DataLoadingOptions,
pub endpoint: Option<String>,
pub storage_integration: Option<String>,
pub credentials: DataLoadingOptions,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct DataLoadingOptions {
pub options: Vec<DataLoadingOption>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum DataLoadingOptionType {
STRING,
BOOLEAN,
ENUM,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct DataLoadingOption {
pub option_name: String,
pub option_type: DataLoadingOptionType,
pub value: String,
}
impl fmt::Display for StageParamsObject {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let url = &self.url.as_ref();
let storage_integration = &self.storage_integration.as_ref();
let endpoint = &self.endpoint.as_ref();
if url.is_some() {
write!(f, " URL='{}'", url.unwrap())?;
}
if storage_integration.is_some() {
write!(f, " STORAGE_INTEGRATION={}", storage_integration.unwrap())?;
}
if endpoint.is_some() {
write!(f, " ENDPOINT='{}'", endpoint.unwrap())?;
}
if !self.credentials.options.is_empty() {
write!(f, " CREDENTIALS=({})", self.credentials)?;
}
if !self.encryption.options.is_empty() {
write!(f, " ENCRYPTION=({})", self.encryption)?;
}
Ok(())
}
}
impl fmt::Display for DataLoadingOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if !self.options.is_empty() {
for option in &self.options {
write!(f, "{}", option)?;
if !option.eq(self.options.last().unwrap()) {
write!(f, " ")?;
}
}
}
Ok(())
}
}
impl fmt::Display for DataLoadingOption {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.option_type {
DataLoadingOptionType::STRING => {
write!(f, "{}='{}'", self.option_name, self.value)?;
}
DataLoadingOptionType::ENUM => {
// single quote is omitted
write!(f, "{}={}", self.option_name, self.value)?;
}
DataLoadingOptionType::BOOLEAN => {
// single quote is omitted
write!(f, "{}={}", self.option_name, self.value)?;
}
}
Ok(())
}
}

View file

@ -44,6 +44,7 @@ pub use self::value::{
escape_quoted_string, DateTimeField, DollarQuotedString, TrimWhereField, Value,
};
use crate::ast::helpers::stmt_data_loading::{DataLoadingOptions, StageParamsObject};
#[cfg(feature = "visitor")]
pub use visitor::*;
@ -1524,6 +1525,21 @@ pub enum Statement {
/// Optional parameters.
params: CreateFunctionBody,
},
/// ```sql
/// CREATE STAGE
/// ```
/// See <https://docs.snowflake.com/en/sql-reference/sql/create-stage>
CreateStage {
or_replace: bool,
temporary: bool,
if_not_exists: bool,
name: ObjectName,
stage_params: StageParamsObject,
directory_table_params: DataLoadingOptions,
file_format: DataLoadingOptions,
copy_options: DataLoadingOptions,
comment: Option<String>,
},
/// `ASSERT <condition> [AS <message>]`
Assert {
condition: Expr,
@ -2746,6 +2762,39 @@ impl fmt::Display for Statement {
}
write!(f, "")
}
Statement::CreateStage {
or_replace,
temporary,
if_not_exists,
name,
stage_params,
directory_table_params,
file_format,
copy_options,
comment,
..
} => {
write!(
f,
"CREATE {or_replace}{temp}STAGE {if_not_exists}{name}{stage_params}",
temp = if *temporary { "TEMPORARY " } else { "" },
or_replace = if *or_replace { "OR REPLACE " } else { "" },
if_not_exists = if *if_not_exists { "IF NOT EXISTS " } else { "" },
)?;
if !directory_table_params.options.is_empty() {
write!(f, " DIRECTORY=({})", directory_table_params)?;
}
if !file_format.options.is_empty() {
write!(f, " FILE_FORMAT=({})", file_format)?;
}
if !copy_options.options.is_empty() {
write!(f, " COPY_OPTIONS=({})", copy_options)?;
}
if comment.is_some() {
write!(f, " COMMENT='{}'", comment.as_ref().unwrap())?;
}
Ok(())
}
}
}
}
@ -3405,6 +3454,7 @@ pub enum ObjectType {
Schema,
Role,
Sequence,
Stage,
}
impl fmt::Display for ObjectType {
@ -3416,6 +3466,7 @@ impl fmt::Display for ObjectType {
ObjectType::Schema => "SCHEMA",
ObjectType::Role => "ROLE",
ObjectType::Sequence => "SEQUENCE",
ObjectType::Stage => "STAGE",
})
}
}

View file

@ -10,7 +10,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(not(feature = "std"))]
use crate::alloc::string::ToString;
use crate::ast::helpers::stmt_data_loading::{
DataLoadingOption, DataLoadingOptionType, DataLoadingOptions, StageParamsObject,
};
use crate::ast::Statement;
use crate::dialect::Dialect;
use crate::keywords::Keyword;
use crate::parser::{Parser, ParserError};
use crate::tokenizer::Token;
#[cfg(not(feature = "std"))]
use alloc::vec;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[derive(Debug, Default)]
pub struct SnowflakeDialect;
@ -32,4 +45,205 @@ impl Dialect for SnowflakeDialect {
fn supports_within_after_array_aggregation(&self) -> bool {
true
}
fn parse_statement(&self, parser: &mut Parser) -> Option<Result<Statement, ParserError>> {
if parser.parse_keyword(Keyword::CREATE) {
// possibly CREATE STAGE
//[ OR REPLACE ]
let or_replace = parser.parse_keywords(&[Keyword::OR, Keyword::REPLACE]);
//[ TEMPORARY ]
let temporary = parser.parse_keyword(Keyword::TEMPORARY);
if parser.parse_keyword(Keyword::STAGE) {
// OK - this is CREATE STAGE statement
return Some(parse_create_stage(or_replace, temporary, parser));
} else {
// need to go back with the cursor
let mut back = 1;
if or_replace {
back += 2
}
if temporary {
back += 1
}
for _i in 0..back {
parser.prev_token();
}
}
}
None
}
}
pub fn parse_create_stage(
or_replace: bool,
temporary: bool,
parser: &mut Parser,
) -> Result<Statement, ParserError> {
//[ IF NOT EXISTS ]
let if_not_exists = parser.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let name = parser.parse_object_name()?;
let mut directory_table_params = Vec::new();
let mut file_format = Vec::new();
let mut copy_options = Vec::new();
let mut comment = None;
// [ internalStageParams | externalStageParams ]
let stage_params = parse_stage_params(parser)?;
// [ directoryTableParams ]
if parser.parse_keyword(Keyword::DIRECTORY) {
parser.expect_token(&Token::Eq)?;
directory_table_params = parse_parentheses_options(parser)?;
}
// [ file_format]
if parser.parse_keyword(Keyword::FILE_FORMAT) {
parser.expect_token(&Token::Eq)?;
file_format = parse_parentheses_options(parser)?;
}
// [ copy_options ]
if parser.parse_keyword(Keyword::COPY_OPTIONS) {
parser.expect_token(&Token::Eq)?;
copy_options = parse_parentheses_options(parser)?;
}
// [ comment ]
if parser.parse_keyword(Keyword::COMMENT) {
parser.expect_token(&Token::Eq)?;
comment = Some(match parser.next_token().token {
Token::SingleQuotedString(word) => Ok(word),
_ => parser.expected("a comment statement", parser.peek_token()),
}?)
}
Ok(Statement::CreateStage {
or_replace,
temporary,
if_not_exists,
name,
stage_params,
directory_table_params: DataLoadingOptions {
options: directory_table_params,
},
file_format: DataLoadingOptions {
options: file_format,
},
copy_options: DataLoadingOptions {
options: copy_options,
},
comment,
})
}
fn parse_stage_params(parser: &mut Parser) -> Result<StageParamsObject, ParserError> {
let (mut url, mut storage_integration, mut endpoint) = (None, None, None);
let mut encryption: DataLoadingOptions = DataLoadingOptions { options: vec![] };
let mut credentials: DataLoadingOptions = DataLoadingOptions { options: vec![] };
// URL
if parser.parse_keyword(Keyword::URL) {
parser.expect_token(&Token::Eq)?;
url = Some(match parser.next_token().token {
Token::SingleQuotedString(word) => Ok(word),
_ => parser.expected("a URL statement", parser.peek_token()),
}?)
}
// STORAGE INTEGRATION
if parser.parse_keyword(Keyword::STORAGE_INTEGRATION) {
parser.expect_token(&Token::Eq)?;
storage_integration = Some(parser.next_token().token.to_string());
}
// ENDPOINT
if parser.parse_keyword(Keyword::ENDPOINT) {
parser.expect_token(&Token::Eq)?;
endpoint = Some(match parser.next_token().token {
Token::SingleQuotedString(word) => Ok(word),
_ => parser.expected("an endpoint statement", parser.peek_token()),
}?)
}
// CREDENTIALS
if parser.parse_keyword(Keyword::CREDENTIALS) {
parser.expect_token(&Token::Eq)?;
credentials = DataLoadingOptions {
options: parse_parentheses_options(parser)?,
};
}
// ENCRYPTION
if parser.parse_keyword(Keyword::ENCRYPTION) {
parser.expect_token(&Token::Eq)?;
encryption = DataLoadingOptions {
options: parse_parentheses_options(parser)?,
};
}
Ok(StageParamsObject {
url,
encryption,
endpoint,
storage_integration,
credentials,
})
}
/// Parses options provided within parentheses like:
/// ( ENABLE = { TRUE | FALSE }
/// [ AUTO_REFRESH = { TRUE | FALSE } ]
/// [ REFRESH_ON_CREATE = { TRUE | FALSE } ]
/// [ NOTIFICATION_INTEGRATION = '<notification_integration_name>' ] )
///
fn parse_parentheses_options(parser: &mut Parser) -> Result<Vec<DataLoadingOption>, ParserError> {
let mut options: Vec<DataLoadingOption> = Vec::new();
parser.expect_token(&Token::LParen)?;
loop {
match parser.next_token().token {
Token::RParen => break,
Token::Word(key) => {
parser.expect_token(&Token::Eq)?;
if parser.parse_keyword(Keyword::TRUE) {
options.push(DataLoadingOption {
option_name: key.value,
option_type: DataLoadingOptionType::BOOLEAN,
value: "TRUE".to_string(),
});
Ok(())
} else if parser.parse_keyword(Keyword::FALSE) {
options.push(DataLoadingOption {
option_name: key.value,
option_type: DataLoadingOptionType::BOOLEAN,
value: "FALSE".to_string(),
});
Ok(())
} else {
match parser.next_token().token {
Token::SingleQuotedString(value) => {
options.push(DataLoadingOption {
option_name: key.value,
option_type: DataLoadingOptionType::STRING,
value,
});
Ok(())
}
Token::Word(word) => {
options.push(DataLoadingOption {
option_name: key.value,
option_type: DataLoadingOptionType::ENUM,
value: word.value,
});
Ok(())
}
_ => parser.expected("expected option value", parser.peek_token()),
}
}
}
_ => parser.expected("another option or ')'", parser.peek_token()),
}?;
}
Ok(options)
}

View file

@ -159,6 +159,7 @@ define_keywords!(
CONTAINS,
CONVERT,
COPY,
COPY_OPTIONS,
CORR,
CORRESPONDING,
COUNT,
@ -167,6 +168,7 @@ define_keywords!(
CREATE,
CREATEDB,
CREATEROLE,
CREDENTIALS,
CROSS,
CSV,
CUBE,
@ -220,8 +222,10 @@ define_keywords!(
ELEMENT,
ELSE,
ENCODING,
ENCRYPTION,
END,
END_EXEC = "END-EXEC",
ENDPOINT,
END_FRAME,
END_PARTITION,
ENGINE,
@ -248,6 +252,7 @@ define_keywords!(
FETCH,
FIELDS,
FILE,
FILE_FORMAT,
FILTER,
FIRST,
FIRST_VALUE,
@ -531,6 +536,7 @@ define_keywords!(
SQLWARNING,
SQRT,
STABLE,
STAGE,
START,
STATIC,
STATISTICS,
@ -538,6 +544,7 @@ define_keywords!(
STDDEV_SAMP,
STDIN,
STDOUT,
STORAGE_INTEGRATION,
STORED,
STRING,
SUBMULTISET,
@ -600,6 +607,7 @@ define_keywords!(
UNTIL,
UPDATE,
UPPER,
URL,
USAGE,
USE,
USER,

View file

@ -3017,11 +3017,13 @@ impl<'a> Parser<'a> {
ObjectType::Schema
} else if self.parse_keyword(Keyword::SEQUENCE) {
ObjectType::Sequence
} else if self.parse_keyword(Keyword::STAGE) {
ObjectType::Stage
} else if self.parse_keyword(Keyword::FUNCTION) {
return self.parse_drop_function();
} else {
return self.expected(
"TABLE, VIEW, INDEX, ROLE, SCHEMA, FUNCTION or SEQUENCE after DROP",
"TABLE, VIEW, INDEX, ROLE, SCHEMA, FUNCTION, STAGE or SEQUENCE after DROP",
self.peek_token(),
);
};

View file

@ -14,6 +14,7 @@
//! Test SQL syntax specific to Snowflake. The parser based on the
//! generic dialect is also tested (on the inputs it can handle).
use sqlparser::ast::helpers::stmt_data_loading::{DataLoadingOption, DataLoadingOptionType};
use sqlparser::ast::*;
use sqlparser::dialect::{GenericDialect, SnowflakeDialect};
use sqlparser::parser::ParserError;
@ -506,3 +507,230 @@ fn test_alter_table_swap_with() {
_ => unreachable!(),
};
}
#[test]
fn test_drop_stage() {
match snowflake_and_generic().verified_stmt("DROP STAGE s1") {
Statement::Drop {
names, if_exists, ..
} => {
assert!(!if_exists);
assert_eq!("s1", names[0].to_string());
}
_ => unreachable!(),
};
match snowflake_and_generic().verified_stmt("DROP STAGE IF EXISTS s1") {
Statement::Drop {
names, if_exists, ..
} => {
assert!(if_exists);
assert_eq!("s1", names[0].to_string());
}
_ => unreachable!(),
};
snowflake_and_generic().one_statement_parses_to("DROP STAGE s1", "DROP STAGE s1");
snowflake_and_generic()
.one_statement_parses_to("DROP STAGE IF EXISTS s1", "DROP STAGE IF EXISTS s1");
}
#[test]
fn test_create_stage() {
let sql = "CREATE STAGE s1.s2";
match snowflake().verified_stmt(sql) {
Statement::CreateStage {
or_replace,
temporary,
if_not_exists,
name,
comment,
..
} => {
assert!(!or_replace);
assert!(!temporary);
assert!(!if_not_exists);
assert_eq!("s1.s2", name.to_string());
assert!(comment.is_none());
}
_ => unreachable!(),
};
assert_eq!(snowflake().verified_stmt(sql).to_string(), sql);
let extended_sql = concat!(
"CREATE OR REPLACE TEMPORARY STAGE IF NOT EXISTS s1.s2 ",
"COMMENT='some-comment'"
);
match snowflake().verified_stmt(extended_sql) {
Statement::CreateStage {
or_replace,
temporary,
if_not_exists,
name,
stage_params,
comment,
..
} => {
assert!(or_replace);
assert!(temporary);
assert!(if_not_exists);
assert!(stage_params.url.is_none());
assert!(stage_params.endpoint.is_none());
assert_eq!("s1.s2", name.to_string());
assert_eq!("some-comment", comment.unwrap());
}
_ => unreachable!(),
};
assert_eq!(
snowflake().verified_stmt(extended_sql).to_string(),
extended_sql
);
}
#[test]
fn test_create_stage_with_stage_params() {
let sql = concat!(
"CREATE OR REPLACE STAGE my_ext_stage ",
"URL='s3://load/files/' ",
"STORAGE_INTEGRATION=myint ",
"ENDPOINT='<s3_api_compatible_endpoint>' ",
"CREDENTIALS=(AWS_KEY_ID='1a2b3c' AWS_SECRET_KEY='4x5y6z') ",
"ENCRYPTION=(MASTER_KEY='key' TYPE='AWS_SSE_KMS')"
);
match snowflake().verified_stmt(sql) {
Statement::CreateStage { stage_params, .. } => {
assert_eq!("s3://load/files/", stage_params.url.unwrap());
assert_eq!("myint", stage_params.storage_integration.unwrap());
assert_eq!(
"<s3_api_compatible_endpoint>",
stage_params.endpoint.unwrap()
);
assert!(stage_params
.credentials
.options
.contains(&DataLoadingOption {
option_name: "AWS_KEY_ID".to_string(),
option_type: DataLoadingOptionType::STRING,
value: "1a2b3c".to_string()
}));
assert!(stage_params
.credentials
.options
.contains(&DataLoadingOption {
option_name: "AWS_SECRET_KEY".to_string(),
option_type: DataLoadingOptionType::STRING,
value: "4x5y6z".to_string()
}));
assert!(stage_params
.encryption
.options
.contains(&DataLoadingOption {
option_name: "MASTER_KEY".to_string(),
option_type: DataLoadingOptionType::STRING,
value: "key".to_string()
}));
assert!(stage_params
.encryption
.options
.contains(&DataLoadingOption {
option_name: "TYPE".to_string(),
option_type: DataLoadingOptionType::STRING,
value: "AWS_SSE_KMS".to_string()
}));
}
_ => unreachable!(),
};
assert_eq!(snowflake().verified_stmt(sql).to_string(), sql);
}
#[test]
fn test_create_stage_with_directory_table_params() {
let sql = concat!(
"CREATE OR REPLACE STAGE my_ext_stage ",
"URL='s3://load/files/' ",
"DIRECTORY=(ENABLE=TRUE REFRESH_ON_CREATE=FALSE NOTIFICATION_INTEGRATION='some-string')"
);
match snowflake().verified_stmt(sql) {
Statement::CreateStage {
directory_table_params,
..
} => {
assert!(directory_table_params.options.contains(&DataLoadingOption {
option_name: "ENABLE".to_string(),
option_type: DataLoadingOptionType::BOOLEAN,
value: "TRUE".to_string()
}));
assert!(directory_table_params.options.contains(&DataLoadingOption {
option_name: "REFRESH_ON_CREATE".to_string(),
option_type: DataLoadingOptionType::BOOLEAN,
value: "FALSE".to_string()
}));
assert!(directory_table_params.options.contains(&DataLoadingOption {
option_name: "NOTIFICATION_INTEGRATION".to_string(),
option_type: DataLoadingOptionType::STRING,
value: "some-string".to_string()
}));
}
_ => unreachable!(),
};
assert_eq!(snowflake().verified_stmt(sql).to_string(), sql);
}
#[test]
fn test_create_stage_with_file_format() {
let sql = concat!(
"CREATE OR REPLACE STAGE my_ext_stage ",
"URL='s3://load/files/' ",
"FILE_FORMAT=(COMPRESSION=AUTO BINARY_FORMAT=HEX ESCAPE='\\')"
);
match snowflake().verified_stmt(sql) {
Statement::CreateStage { file_format, .. } => {
assert!(file_format.options.contains(&DataLoadingOption {
option_name: "COMPRESSION".to_string(),
option_type: DataLoadingOptionType::ENUM,
value: "AUTO".to_string()
}));
assert!(file_format.options.contains(&DataLoadingOption {
option_name: "BINARY_FORMAT".to_string(),
option_type: DataLoadingOptionType::ENUM,
value: "HEX".to_string()
}));
assert!(file_format.options.contains(&DataLoadingOption {
option_name: "ESCAPE".to_string(),
option_type: DataLoadingOptionType::STRING,
value: "\\".to_string()
}));
}
_ => unreachable!(),
};
assert_eq!(snowflake().verified_stmt(sql).to_string(), sql);
}
#[test]
fn test_create_stage_with_copy_options() {
let sql = concat!(
"CREATE OR REPLACE STAGE my_ext_stage ",
"URL='s3://load/files/' ",
"COPY_OPTIONS=(ON_ERROR=CONTINUE FORCE=TRUE)"
);
match snowflake().verified_stmt(sql) {
Statement::CreateStage { copy_options, .. } => {
assert!(copy_options.options.contains(&DataLoadingOption {
option_name: "ON_ERROR".to_string(),
option_type: DataLoadingOptionType::ENUM,
value: "CONTINUE".to_string()
}));
assert!(copy_options.options.contains(&DataLoadingOption {
option_name: "FORCE".to_string(),
option_type: DataLoadingOptionType::BOOLEAN,
value: "TRUE".to_string()
}));
}
_ => unreachable!(),
};
assert_eq!(snowflake().verified_stmt(sql).to_string(), sql);
}