Snowflake COPY INTO target columns, select items and optional alias (#1805)

This commit is contained in:
Yoav Cohen 2025-04-11 11:49:43 +02:00 committed by GitHub
parent 67c3be075e
commit d090ad4ccf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 147 additions and 91 deletions

View file

@ -29,7 +29,7 @@ use core::fmt;
use serde::{Deserialize, Serialize};
use crate::ast::helpers::key_value_options::KeyValueOptions;
use crate::ast::{Ident, ObjectName};
use crate::ast::{Ident, ObjectName, SelectItem};
#[cfg(feature = "visitor")]
use sqlparser_derive::{Visit, VisitMut};
@ -44,6 +44,25 @@ pub struct StageParamsObject {
pub credentials: KeyValueOptions,
}
/// This enum enables support for both standard SQL select item expressions
/// and Snowflake-specific ones for data loading.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum StageLoadSelectItemKind {
SelectItem(SelectItem),
StageLoadSelectItem(StageLoadSelectItem),
}
impl fmt::Display for StageLoadSelectItemKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self {
StageLoadSelectItemKind::SelectItem(item) => write!(f, "{item}"),
StageLoadSelectItemKind::StageLoadSelectItem(item) => write!(f, "{item}"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]

View file

@ -23,7 +23,10 @@ use alloc::{
string::{String, ToString},
vec::Vec,
};
use helpers::{attached_token::AttachedToken, stmt_data_loading::FileStagingCommand};
use helpers::{
attached_token::AttachedToken,
stmt_data_loading::{FileStagingCommand, StageLoadSelectItemKind},
};
use core::ops::Deref;
use core::{
@ -92,7 +95,7 @@ pub use self::value::{
};
use crate::ast::helpers::key_value_options::KeyValueOptions;
use crate::ast::helpers::stmt_data_loading::{StageLoadSelectItem, StageParamsObject};
use crate::ast::helpers::stmt_data_loading::StageParamsObject;
#[cfg(feature = "visitor")]
pub use visitor::*;
@ -2988,10 +2991,11 @@ pub enum Statement {
CopyIntoSnowflake {
kind: CopyIntoSnowflakeKind,
into: ObjectName,
into_columns: Option<Vec<Ident>>,
from_obj: Option<ObjectName>,
from_obj_alias: Option<Ident>,
stage_params: StageParamsObject,
from_transformations: Option<Vec<StageLoadSelectItem>>,
from_transformations: Option<Vec<StageLoadSelectItemKind>>,
from_query: Option<Box<Query>>,
files: Option<Vec<String>>,
pattern: Option<String>,
@ -5583,6 +5587,7 @@ impl fmt::Display for Statement {
Statement::CopyIntoSnowflake {
kind,
into,
into_columns,
from_obj,
from_obj_alias,
stage_params,
@ -5596,6 +5601,9 @@ impl fmt::Display for Statement {
partition,
} => {
write!(f, "COPY INTO {}", into)?;
if let Some(into_columns) = into_columns {
write!(f, " ({})", display_comma_separated(into_columns))?;
}
if let Some(from_transformations) = from_transformations {
// Data load with transformation
if let Some(from_stage) = from_obj {

View file

@ -350,6 +350,7 @@ impl Spanned for Statement {
} => source.span(),
Statement::CopyIntoSnowflake {
into: _,
into_columns: _,
from_obj: _,
from_obj_alias: _,
stage_params: _,

View file

@ -20,7 +20,7 @@ use crate::alloc::string::ToString;
use crate::ast::helpers::key_value_options::{KeyValueOption, KeyValueOptionType, KeyValueOptions};
use crate::ast::helpers::stmt_create_table::CreateTableBuilder;
use crate::ast::helpers::stmt_data_loading::{
FileStagingCommand, StageLoadSelectItem, StageParamsObject,
FileStagingCommand, StageLoadSelectItem, StageLoadSelectItemKind, StageParamsObject,
};
use crate::ast::{
ColumnOption, ColumnPolicy, ColumnPolicyProperty, CopyIntoSnowflakeKind, Ident,
@ -30,7 +30,7 @@ use crate::ast::{
};
use crate::dialect::{Dialect, Precedence};
use crate::keywords::Keyword;
use crate::parser::{Parser, ParserError};
use crate::parser::{IsOptional, Parser, ParserError};
use crate::tokenizer::{Token, Word};
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
@ -722,7 +722,7 @@ pub fn parse_copy_into(parser: &mut Parser) -> Result<Statement, ParserError> {
};
let mut files: Vec<String> = vec![];
let mut from_transformations: Option<Vec<StageLoadSelectItem>> = None;
let mut from_transformations: Option<Vec<StageLoadSelectItemKind>> = None;
let mut from_stage_alias = None;
let mut from_stage = None;
let mut stage_params = StageParamsObject {
@ -744,6 +744,11 @@ pub fn parse_copy_into(parser: &mut Parser) -> Result<Statement, ParserError> {
stage_params = parse_stage_params(parser)?;
}
let into_columns = match &parser.peek_token().token {
Token::LParen => Some(parser.parse_parenthesized_column_list(IsOptional::Optional, true)?),
_ => None,
};
parser.expect_keyword_is(Keyword::FROM)?;
match parser.next_token().token {
Token::LParen if kind == CopyIntoSnowflakeKind::Table => {
@ -755,15 +760,10 @@ pub fn parse_copy_into(parser: &mut Parser) -> Result<Statement, ParserError> {
from_stage = Some(parse_snowflake_stage_name(parser)?);
stage_params = parse_stage_params(parser)?;
// as
from_stage_alias = if parser.parse_keyword(Keyword::AS) {
Some(match parser.next_token().token {
Token::Word(w) => Ok(Ident::new(w.value)),
_ => parser.expected("stage alias", parser.peek_token()),
}?)
} else {
None
};
// Parse an optional alias
from_stage_alias = parser
.maybe_parse_table_alias()?
.map(|table_alias| table_alias.name);
parser.expect_token(&Token::RParen)?;
}
Token::LParen if kind == CopyIntoSnowflakeKind::Location => {
@ -846,6 +846,7 @@ pub fn parse_copy_into(parser: &mut Parser) -> Result<Statement, ParserError> {
Ok(Statement::CopyIntoSnowflake {
kind,
into,
into_columns,
from_obj: from_stage,
from_obj_alias: from_stage_alias,
stage_params,
@ -866,86 +867,93 @@ pub fn parse_copy_into(parser: &mut Parser) -> Result<Statement, ParserError> {
fn parse_select_items_for_data_load(
parser: &mut Parser,
) -> Result<Option<Vec<StageLoadSelectItem>>, ParserError> {
// [<alias>.]$<file_col_num>[.<element>] [ , [<alias>.]$<file_col_num>[.<element>] ... ]
let mut select_items: Vec<StageLoadSelectItem> = vec![];
) -> Result<Option<Vec<StageLoadSelectItemKind>>, ParserError> {
let mut select_items: Vec<StageLoadSelectItemKind> = vec![];
loop {
let mut alias: Option<Ident> = None;
let mut file_col_num: i32 = 0;
let mut element: Option<Ident> = None;
let mut item_as: Option<Ident> = None;
match parser.maybe_parse(parse_select_item_for_data_load)? {
// [<alias>.]$<file_col_num>[.<element>] [ , [<alias>.]$<file_col_num>[.<element>] ... ]
Some(item) => select_items.push(StageLoadSelectItemKind::StageLoadSelectItem(item)),
// Fallback, try to parse a standard SQL select item
None => select_items.push(StageLoadSelectItemKind::SelectItem(
parser.parse_select_item()?,
)),
}
if matches!(parser.peek_token_ref().token, Token::Comma) {
parser.advance_token();
} else {
break;
}
}
Ok(Some(select_items))
}
let next_token = parser.next_token();
match next_token.token {
fn parse_select_item_for_data_load(
parser: &mut Parser,
) -> Result<StageLoadSelectItem, ParserError> {
let mut alias: Option<Ident> = None;
let mut file_col_num: i32 = 0;
let mut element: Option<Ident> = None;
let mut item_as: Option<Ident> = None;
let next_token = parser.next_token();
match next_token.token {
Token::Placeholder(w) => {
file_col_num = w.to_string().split_off(1).parse::<i32>().map_err(|e| {
ParserError::ParserError(format!("Could not parse '{w}' as i32: {e}"))
})?;
Ok(())
}
Token::Word(w) => {
alias = Some(Ident::new(w.value));
Ok(())
}
_ => parser.expected("alias or file_col_num", next_token),
}?;
if alias.is_some() {
parser.expect_token(&Token::Period)?;
// now we get col_num token
let col_num_token = parser.next_token();
match col_num_token.token {
Token::Placeholder(w) => {
file_col_num = w.to_string().split_off(1).parse::<i32>().map_err(|e| {
ParserError::ParserError(format!("Could not parse '{w}' as i32: {e}"))
})?;
Ok(())
}
Token::Word(w) => {
alias = Some(Ident::new(w.value));
Ok(())
}
_ => parser.expected("alias or file_col_num", next_token),
_ => parser.expected("file_col_num", col_num_token),
}?;
}
if alias.is_some() {
parser.expect_token(&Token::Period)?;
// now we get col_num token
let col_num_token = parser.next_token();
match col_num_token.token {
Token::Placeholder(w) => {
file_col_num = w.to_string().split_off(1).parse::<i32>().map_err(|e| {
ParserError::ParserError(format!("Could not parse '{w}' as i32: {e}"))
})?;
Ok(())
}
_ => parser.expected("file_col_num", col_num_token),
}?;
// try extracting optional element
match parser.next_token().token {
Token::Colon => {
// parse element
element = Some(Ident::new(match parser.next_token().token {
Token::Word(w) => Ok(w.value),
_ => parser.expected("file_col_num", parser.peek_token()),
}?));
}
// try extracting optional element
match parser.next_token().token {
Token::Colon => {
// parse element
element = Some(Ident::new(match parser.next_token().token {
Token::Word(w) => Ok(w.value),
_ => parser.expected("file_col_num", parser.peek_token()),
}?));
}
_ => {
// element not present move back
parser.prev_token();
}
}
// as
if parser.parse_keyword(Keyword::AS) {
item_as = Some(match parser.next_token().token {
Token::Word(w) => Ok(Ident::new(w.value)),
_ => parser.expected("column item alias", parser.peek_token()),
}?);
}
select_items.push(StageLoadSelectItem {
alias,
file_col_num,
element,
item_as,
});
match parser.next_token().token {
Token::Comma => {
// continue
}
_ => {
parser.prev_token(); // need to move back
break;
}
_ => {
// element not present move back
parser.prev_token();
}
}
Ok(Some(select_items))
// as
if parser.parse_keyword(Keyword::AS) {
item_as = Some(match parser.next_token().token {
Token::Word(w) => Ok(Ident::new(w.value)),
_ => parser.expected("column item alias", parser.peek_token()),
}?);
}
Ok(StageLoadSelectItem {
alias,
file_col_num,
element,
item_as,
})
}
fn parse_stage_params(parser: &mut Parser) -> Result<StageParamsObject, ParserError> {

View file

@ -20,7 +20,7 @@
//! generic dialect is also tested (on the inputs it can handle).
use sqlparser::ast::helpers::key_value_options::{KeyValueOption, KeyValueOptionType};
use sqlparser::ast::helpers::stmt_data_loading::StageLoadSelectItem;
use sqlparser::ast::helpers::stmt_data_loading::{StageLoadSelectItem, StageLoadSelectItemKind};
use sqlparser::ast::*;
use sqlparser::dialect::{Dialect, GenericDialect, SnowflakeDialect};
use sqlparser::parser::{ParserError, ParserOptions};
@ -2256,7 +2256,7 @@ fn test_copy_into_with_files_and_pattern_and_verification() {
fn test_copy_into_with_transformations() {
let sql = concat!(
"COPY INTO my_company.emp_basic FROM ",
"(SELECT t1.$1:st AS st, $1:index, t2.$1 FROM @schema.general_finished AS T) ",
"(SELECT t1.$1:st AS st, $1:index, t2.$1, 4, '5' AS const_str FROM @schema.general_finished AS T) ",
"FILES = ('file1.json', 'file2.json') ",
"PATTERN = '.*employees0[1-5].csv.gz' ",
"VALIDATION_MODE = RETURN_7_ROWS"
@ -2277,35 +2277,55 @@ fn test_copy_into_with_transformations() {
);
assert_eq!(
from_transformations.as_ref().unwrap()[0],
StageLoadSelectItem {
StageLoadSelectItemKind::StageLoadSelectItem(StageLoadSelectItem {
alias: Some(Ident::new("t1")),
file_col_num: 1,
element: Some(Ident::new("st")),
item_as: Some(Ident::new("st"))
}
})
);
assert_eq!(
from_transformations.as_ref().unwrap()[1],
StageLoadSelectItem {
StageLoadSelectItemKind::StageLoadSelectItem(StageLoadSelectItem {
alias: None,
file_col_num: 1,
element: Some(Ident::new("index")),
item_as: None
}
})
);
assert_eq!(
from_transformations.as_ref().unwrap()[2],
StageLoadSelectItem {
StageLoadSelectItemKind::StageLoadSelectItem(StageLoadSelectItem {
alias: Some(Ident::new("t2")),
file_col_num: 1,
element: None,
item_as: None
}
})
);
assert_eq!(
from_transformations.as_ref().unwrap()[3],
StageLoadSelectItemKind::SelectItem(SelectItem::UnnamedExpr(Expr::Value(
Value::Number("4".parse().unwrap(), false).into()
)))
);
assert_eq!(
from_transformations.as_ref().unwrap()[4],
StageLoadSelectItemKind::SelectItem(SelectItem::ExprWithAlias {
expr: Expr::Value(Value::SingleQuotedString("5".parse().unwrap()).into()),
alias: Ident::new("const_str".to_string())
})
);
}
_ => unreachable!(),
}
assert_eq!(snowflake().verified_stmt(sql).to_string(), sql);
// Test optional AS keyword to denote an alias for the stage
let sql1 = concat!(
"COPY INTO my_company.emp_basic FROM ",
"(SELECT t1.$1:st AS st, $1:index, t2.$1, 4, '5' AS const_str FROM @schema.general_finished T) "
);
snowflake().parse_sql_statements(sql1).unwrap();
}
#[test]