add support for MERGE statement (#430)

* add support for MERGE statement

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

* fix lint errors

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
This commit is contained in:
Maciej Obuchowski 2022-03-12 20:28:57 +01:00 committed by GitHub
parent c688bbb4de
commit 6b55e51101
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 327 additions and 0 deletions

View file

@ -956,6 +956,19 @@ pub enum Statement {
/// A SQL query that specifies what to explain /// A SQL query that specifies what to explain
statement: Box<Statement>, statement: Box<Statement>,
}, },
// MERGE INTO statement, based on Snowflake. See <https://docs.snowflake.com/en/sql-reference/sql/merge.html>
Merge {
// Specifies the table to merge
table: TableFactor,
// Specifies the table or subquery to join with the target table
source: Box<SetExpr>,
// Specifies alias to the table that is joined with target table
alias: Option<TableAlias>,
// Specifies the expression on which to join the target table and source
on: Box<Expr>,
// Specifies the actions to perform when values match or do not match.
clauses: Vec<MergeClause>,
},
} }
impl fmt::Display for Statement { impl fmt::Display for Statement {
@ -1606,6 +1619,20 @@ impl fmt::Display for Statement {
write!(f, "NULL") write!(f, "NULL")
} }
} }
Statement::Merge {
table,
source,
alias,
on,
clauses,
} => {
write!(f, "MERGE INTO {} USING {} ", table, source)?;
if let Some(a) = alias {
write!(f, "as {} ", a)?;
};
write!(f, "ON {} ", on)?;
write!(f, "{}", display_separated(clauses, " "))
}
} }
} }
} }
@ -2137,6 +2164,68 @@ impl fmt::Display for SqliteOnConflict {
} }
} }
///
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum MergeClause {
MatchedUpdate {
predicate: Option<Expr>,
assignments: Vec<Assignment>,
},
MatchedDelete(Option<Expr>),
NotMatched {
predicate: Option<Expr>,
columns: Vec<Ident>,
values: Values,
},
}
impl fmt::Display for MergeClause {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use MergeClause::*;
write!(f, "WHEN")?;
match self {
MatchedUpdate {
predicate,
assignments,
} => {
write!(f, " MATCHED")?;
if let Some(pred) = predicate {
write!(f, " AND {}", pred)?;
}
write!(
f,
" THEN UPDATE SET {}",
display_comma_separated(assignments)
)
}
MatchedDelete(predicate) => {
write!(f, " MATCHED")?;
if let Some(pred) = predicate {
write!(f, " AND {}", pred)?;
}
write!(f, " THEN DELETE")
}
NotMatched {
predicate,
columns,
values,
} => {
write!(f, " NOT MATCHED")?;
if let Some(pred) = predicate {
write!(f, " AND {}", pred)?;
}
write!(
f,
" THEN INSERT ({}) {}",
display_comma_separated(columns),
values
)
}
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -295,6 +295,7 @@ define_keywords!(
LOWER, LOWER,
MANAGEDLOCATION, MANAGEDLOCATION,
MATCH, MATCH,
MATCHED,
MATERIALIZED, MATERIALIZED,
MAX, MAX,
MEMBER, MEMBER,

View file

@ -187,6 +187,7 @@ impl<'a> Parser<'a> {
Keyword::DEALLOCATE => Ok(self.parse_deallocate()?), Keyword::DEALLOCATE => Ok(self.parse_deallocate()?),
Keyword::EXECUTE => Ok(self.parse_execute()?), Keyword::EXECUTE => Ok(self.parse_execute()?),
Keyword::PREPARE => Ok(self.parse_prepare()?), Keyword::PREPARE => Ok(self.parse_prepare()?),
Keyword::MERGE => Ok(self.parse_merge()?),
Keyword::REPLACE if dialect_of!(self is SQLiteDialect ) => { Keyword::REPLACE if dialect_of!(self is SQLiteDialect ) => {
self.prev_token(); self.prev_token();
Ok(self.parse_insert()?) Ok(self.parse_insert()?)
@ -3892,6 +3893,104 @@ impl<'a> Parser<'a> {
comment, comment,
}) })
} }
pub fn parse_merge_clauses(&mut self) -> Result<Vec<MergeClause>, ParserError> {
let mut clauses: Vec<MergeClause> = vec![];
loop {
if self.peek_token() == Token::EOF {
break;
}
self.expect_keyword(Keyword::WHEN)?;
let is_not_matched = self.parse_keyword(Keyword::NOT);
self.expect_keyword(Keyword::MATCHED)?;
let predicate = if self.parse_keyword(Keyword::AND) {
Some(self.parse_expr()?)
} else {
None
};
self.expect_keyword(Keyword::THEN)?;
clauses.push(
match self.parse_one_of_keywords(&[
Keyword::UPDATE,
Keyword::INSERT,
Keyword::DELETE,
]) {
Some(Keyword::UPDATE) => {
if is_not_matched {
return Err(ParserError::ParserError(
"UPDATE in NOT MATCHED merge clause".to_string(),
));
}
self.expect_keyword(Keyword::SET)?;
let assignments = self.parse_comma_separated(Parser::parse_assignment)?;
MergeClause::MatchedUpdate {
predicate,
assignments,
}
}
Some(Keyword::DELETE) => {
if is_not_matched {
return Err(ParserError::ParserError(
"DELETE in NOT MATCHED merge clause".to_string(),
));
}
MergeClause::MatchedDelete(predicate)
}
Some(Keyword::INSERT) => {
if !is_not_matched {
return Err(ParserError::ParserError(
"INSERT in MATCHED merge clause".to_string(),
));
}
let columns = self.parse_parenthesized_column_list(Optional)?;
self.expect_keyword(Keyword::VALUES)?;
let values = self.parse_values()?;
MergeClause::NotMatched {
predicate,
columns,
values,
}
}
Some(_) => {
return Err(ParserError::ParserError(
"expected UPDATE, DELETE or INSERT in merge clause".to_string(),
))
}
None => {
return Err(ParserError::ParserError(
"expected UPDATE, DELETE or INSERT in merge clause".to_string(),
))
}
},
);
}
Ok(clauses)
}
pub fn parse_merge(&mut self) -> Result<Statement, ParserError> {
self.expect_keyword(Keyword::INTO)?;
let table = self.parse_table_factor()?;
self.expect_keyword(Keyword::USING)?;
let source = self.parse_query_body(0)?;
let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?;
self.expect_keyword(Keyword::ON)?;
let on = self.parse_expr()?;
let clauses = self.parse_merge_clauses()?;
Ok(Statement::Merge {
table,
source: Box::new(source),
alias,
on: Box::new(on),
clauses,
})
}
} }
impl Word { impl Word {

View file

@ -4204,6 +4204,144 @@ fn test_revoke() {
} }
} }
#[test]
fn parse_merge() {
let sql = "MERGE INTO s.bar AS dest USING (SELECT * FROM s.foo) as stg ON dest.D = stg.D AND dest.E = stg.E WHEN NOT MATCHED THEN INSERT (A, B, C) VALUES (stg.A, stg.B, stg.C) WHEN MATCHED AND dest.A = 'a' THEN UPDATE SET dest.F = stg.F, dest.G = stg.G WHEN MATCHED THEN DELETE";
match verified_stmt(sql) {
Statement::Merge {
table,
source,
alias,
on,
clauses,
} => {
assert_eq!(
table,
TableFactor::Table {
name: ObjectName(vec![Ident::new("s"), Ident::new("bar")]),
alias: Some(TableAlias {
name: Ident::new("dest"),
columns: vec![]
}),
args: vec![],
with_hints: vec![]
}
);
assert_eq!(
source,
Box::new(SetExpr::Query(Box::new(Query {
with: None,
body: SetExpr::Select(Box::new(Select {
distinct: false,
top: None,
projection: vec![SelectItem::Wildcard],
from: vec![TableWithJoins {
relation: TableFactor::Table {
name: ObjectName(vec![Ident::new("s"), Ident::new("foo")]),
alias: None,
args: vec![],
with_hints: vec![],
},
joins: vec![]
}],
lateral_views: vec![],
selection: None,
group_by: vec![],
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None
})),
order_by: vec![],
limit: None,
offset: None,
fetch: None,
lock: None
})))
);
assert_eq!(
alias,
Some(TableAlias {
name: Ident::new("stg"),
columns: vec![]
})
);
assert_eq!(
on,
Box::new(Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("dest"),
Ident::new("D")
])),
op: BinaryOperator::Eq,
right: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("D")
]))
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("dest"),
Ident::new("E")
])),
op: BinaryOperator::Eq,
right: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("E")
]))
})
})
);
assert_eq!(
clauses,
vec![
MergeClause::NotMatched {
predicate: None,
columns: vec![Ident::new("A"), Ident::new("B"), Ident::new("C")],
values: Values(vec![vec![
Expr::CompoundIdentifier(vec![Ident::new("stg"), Ident::new("A")]),
Expr::CompoundIdentifier(vec![Ident::new("stg"), Ident::new("B")]),
Expr::CompoundIdentifier(vec![Ident::new("stg"), Ident::new("C")]),
]])
},
MergeClause::MatchedUpdate {
predicate: Some(Expr::BinaryOp {
left: Box::new(Expr::CompoundIdentifier(vec![
Ident::new("dest"),
Ident::new("A")
])),
op: BinaryOperator::Eq,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"a".to_string()
)))
}),
assignments: vec![
Assignment {
id: vec![Ident::new("dest"), Ident::new("F")],
value: Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("F")
])
},
Assignment {
id: vec![Ident::new("dest"), Ident::new("G")],
value: Expr::CompoundIdentifier(vec![
Ident::new("stg"),
Ident::new("G")
])
}
]
},
MergeClause::MatchedDelete(None)
]
)
}
_ => unreachable!(),
}
}
#[test] #[test]
fn test_lock() { fn test_lock() {
let sql = "SELECT * FROM student WHERE id = '1' FOR UPDATE"; let sql = "SELECT * FROM student WHERE id = '1' FOR UPDATE";