feat: support INSERT INTO [TABLE] FUNCTION of Clickhouse (#1633)

Co-authored-by: Kermit <chenjiawei1@xiaohongshu.com>
Co-authored-by: Ifeanyi Ubah <ify1992@yahoo.com>
This commit is contained in:
cjw 2025-01-10 22:23:56 +08:00 committed by GitHub
parent c54ba4dc41
commit b09514e492
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 147 additions and 53 deletions

View file

@ -36,7 +36,7 @@ use super::{
FileFormat, FromTable, HiveDistributionStyle, HiveFormat, HiveIOFormat, HiveRowFormat, Ident,
InsertAliases, MysqlInsertPriority, ObjectName, OnCommit, OnInsert, OneOrManyWithParens,
OrderByExpr, Query, RowAccessPolicy, SelectItem, SqlOption, SqliteOnConflict, TableEngine,
TableWithJoins, Tag, WrappedCollection,
TableObject, TableWithJoins, Tag, WrappedCollection,
};
/// CREATE INDEX statement.
@ -470,8 +470,7 @@ pub struct Insert {
/// INTO - optional keyword
pub into: bool,
/// TABLE
#[cfg_attr(feature = "visitor", visit(with = "visit_relation"))]
pub table_name: ObjectName,
pub table: TableObject,
/// table_name as foo (for PostgreSQL)
pub table_alias: Option<Ident>,
/// COLUMNS
@ -488,7 +487,7 @@ pub struct Insert {
/// Columns defined after PARTITION
pub after_columns: Vec<Ident>,
/// whether the insert has the table keyword (Hive)
pub table: bool,
pub has_table_keyword: bool,
pub on: Option<OnInsert>,
/// RETURNING
pub returning: Option<Vec<SelectItem>>,
@ -503,9 +502,9 @@ pub struct Insert {
impl Display for Insert {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let table_name = if let Some(alias) = &self.table_alias {
format!("{0} AS {alias}", self.table_name)
format!("{0} AS {alias}", self.table)
} else {
self.table_name.to_string()
self.table.to_string()
};
if let Some(on_conflict) = self.or {
@ -531,7 +530,7 @@ impl Display for Insert {
ignore = if self.ignore { " IGNORE" } else { "" },
over = if self.overwrite { " OVERWRITE" } else { "" },
int = if self.into { " INTO" } else { "" },
tbl = if self.table { " TABLE" } else { "" },
tbl = if self.has_table_keyword { " TABLE" } else { "" },
)?;
}
if !self.columns.is_empty() {

View file

@ -7907,6 +7907,36 @@ impl fmt::Display for RenameTable {
}
}
/// Represents the referenced table in an `INSERT INTO` statement
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum TableObject {
/// Table specified by name.
/// Example:
/// ```sql
/// INSERT INTO my_table
/// ```
TableName(#[cfg_attr(feature = "visitor", visit(with = "visit_relation"))] ObjectName),
/// Table specified as a function.
/// Example:
/// ```sql
/// INSERT INTO TABLE FUNCTION remote('localhost', default.simple_table)
/// ```
/// [Clickhouse](https://clickhouse.com/docs/en/sql-reference/table-functions)
TableFunction(Function),
}
impl fmt::Display for TableObject {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::TableName(table_name) => write!(f, "{table_name}"),
Self::TableFunction(func) => write!(f, "FUNCTION {}", func),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -32,8 +32,9 @@ use super::{
OrderBy, OrderByExpr, Partition, PivotValueSource, ProjectionSelect, Query, ReferentialAction,
RenameSelectItem, ReplaceSelectElement, ReplaceSelectItem, Select, SelectInto, SelectItem,
SetExpr, SqlOption, Statement, Subscript, SymbolDefinition, TableAlias, TableAliasColumnDef,
TableConstraint, TableFactor, TableOptionsClustered, TableWithJoins, UpdateTableFromKind, Use,
Value, Values, ViewColumnDef, WildcardAdditionalOptions, With, WithFill,
TableConstraint, TableFactor, TableObject, TableOptionsClustered, TableWithJoins,
UpdateTableFromKind, Use, Value, Values, ViewColumnDef, WildcardAdditionalOptions, With,
WithFill,
};
/// Given an iterator of spans, return the [Span::union] of all spans.
@ -1141,14 +1142,14 @@ impl Spanned for Insert {
or: _, // enum, sqlite specific
ignore: _, // bool
into: _, // bool
table_name,
table,
table_alias,
columns,
overwrite: _, // bool
source,
partitioned,
after_columns,
table: _, // bool
has_table_keyword: _, // bool
on,
returning,
replace_into: _, // bool
@ -1158,7 +1159,7 @@ impl Spanned for Insert {
} = self;
union_spans(
core::iter::once(table_name.span())
core::iter::once(table.span())
.chain(table_alias.as_ref().map(|i| i.span))
.chain(columns.iter().map(|i| i.span))
.chain(source.as_ref().map(|q| q.span()))
@ -2121,6 +2122,17 @@ impl Spanned for UpdateTableFromKind {
}
}
impl Spanned for TableObject {
fn span(&self) -> Span {
match self {
TableObject::TableName(ObjectName(segments)) => {
union_spans(segments.iter().map(|i| i.span))
}
TableObject::TableFunction(func) => func.span(),
}
}
}
#[cfg(test)]
pub mod tests {
use crate::dialect::{Dialect, GenericDialect, SnowflakeDialect};

View file

@ -50,4 +50,8 @@ impl Dialect for ClickHouseDialect {
fn supports_limit_comma(&self) -> bool {
true
}
fn supports_insert_table_function(&self) -> bool {
true
}
}

View file

@ -792,6 +792,11 @@ pub trait Dialect: Debug + Any {
fn supports_insert_set(&self) -> bool {
false
}
/// Does the dialect support table function in insertion?
fn supports_insert_table_function(&self) -> bool {
false
}
}
/// This represents the operators for which precedence must be defined

View file

@ -8937,6 +8937,18 @@ impl<'a> Parser<'a> {
}
}
/// Parse a table object for insetion
/// e.g. `some_database.some_table` or `FUNCTION some_table_func(...)`
pub fn parse_table_object(&mut self) -> Result<TableObject, ParserError> {
if self.dialect.supports_insert_table_function() && self.parse_keyword(Keyword::FUNCTION) {
let fn_name = self.parse_object_name(false)?;
self.parse_function_call(fn_name)
.map(TableObject::TableFunction)
} else {
self.parse_object_name(false).map(TableObject::TableName)
}
}
/// Parse a possibly qualified, possibly quoted identifier, optionally allowing for wildcards,
/// e.g. *, *.*, `foo`.*, or "foo"."bar"
fn parse_object_name_with_wildcards(
@ -12010,7 +12022,7 @@ impl<'a> Parser<'a> {
} else {
// Hive lets you put table here regardless
let table = self.parse_keyword(Keyword::TABLE);
let table_name = self.parse_object_name(false)?;
let table_object = self.parse_table_object()?;
let table_alias =
if dialect_of!(self is PostgreSqlDialect) && self.parse_keyword(Keyword::AS) {
@ -12118,7 +12130,7 @@ impl<'a> Parser<'a> {
Ok(Statement::Insert(Insert {
or,
table_name,
table: table_object,
table_alias,
ignore,
into,
@ -12128,7 +12140,7 @@ impl<'a> Parser<'a> {
after_columns,
source,
assignments,
table,
has_table_keyword: table,
on,
returning,
replace_into,

View file

@ -154,7 +154,6 @@ impl TestedDialects {
pub fn one_statement_parses_to(&self, sql: &str, canonical: &str) -> Statement {
let mut statements = self.parse_sql_statements(sql).expect(sql);
assert_eq!(statements.len(), 1);
if !canonical.is_empty() && sql != canonical {
assert_eq!(self.parse_sql_statements(canonical).unwrap(), statements);
}

View file

@ -222,6 +222,12 @@ fn parse_create_table() {
);
}
#[test]
fn parse_insert_into_function() {
clickhouse().verified_stmt(r#"INSERT INTO TABLE FUNCTION remote('localhost', default.simple_table) VALUES (100, 'inserted via remote()')"#);
clickhouse().verified_stmt(r#"INSERT INTO FUNCTION remote('localhost', default.simple_table) VALUES (100, 'inserted via remote()')"#);
}
#[test]
fn parse_alter_table_attach_and_detach_partition() {
for operation in &["ATTACH", "DETACH"] {

View file

@ -96,7 +96,7 @@ fn parse_insert_values() {
) {
match verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source: Some(source),
..
@ -149,7 +149,7 @@ fn parse_insert_default_values() {
partitioned,
returning,
source,
table_name,
table: table_name,
..
}) => {
assert_eq!(columns, vec![]);
@ -158,7 +158,10 @@ fn parse_insert_default_values() {
assert_eq!(partitioned, None);
assert_eq!(returning, None);
assert_eq!(source, None);
assert_eq!(table_name, ObjectName(vec!["test_table".into()]));
assert_eq!(
table_name,
TableObject::TableName(ObjectName(vec!["test_table".into()]))
);
}
_ => unreachable!(),
}
@ -174,7 +177,7 @@ fn parse_insert_default_values() {
partitioned,
returning,
source,
table_name,
table: table_name,
..
}) => {
assert_eq!(after_columns, vec![]);
@ -183,7 +186,10 @@ fn parse_insert_default_values() {
assert_eq!(partitioned, None);
assert!(returning.is_some());
assert_eq!(source, None);
assert_eq!(table_name, ObjectName(vec!["test_table".into()]));
assert_eq!(
table_name,
TableObject::TableName(ObjectName(vec!["test_table".into()]))
);
}
_ => unreachable!(),
}
@ -199,7 +205,7 @@ fn parse_insert_default_values() {
partitioned,
returning,
source,
table_name,
table: table_name,
..
}) => {
assert_eq!(after_columns, vec![]);
@ -208,7 +214,10 @@ fn parse_insert_default_values() {
assert_eq!(partitioned, None);
assert_eq!(returning, None);
assert_eq!(source, None);
assert_eq!(table_name, ObjectName(vec!["test_table".into()]));
assert_eq!(
table_name,
TableObject::TableName(ObjectName(vec!["test_table".into()]))
);
}
_ => unreachable!(),
}

View file

@ -1406,13 +1406,16 @@ fn parse_simple_insert() {
match mysql().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
..
}) => {
assert_eq!(ObjectName(vec![Ident::new("tasks")]), table_name);
assert_eq!(
TableObject::TableName(ObjectName(vec![Ident::new("tasks")])),
table_name
);
assert_eq!(vec![Ident::new("title"), Ident::new("priority")], columns);
assert!(on.is_none());
assert_eq!(
@ -1460,14 +1463,17 @@ fn parse_ignore_insert() {
match mysql_and_generic().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
ignore,
..
}) => {
assert_eq!(ObjectName(vec![Ident::new("tasks")]), table_name);
assert_eq!(
TableObject::TableName(ObjectName(vec![Ident::new("tasks")])),
table_name
);
assert_eq!(vec![Ident::new("title"), Ident::new("priority")], columns);
assert!(on.is_none());
assert!(ignore);
@ -1504,14 +1510,17 @@ fn parse_priority_insert() {
match mysql_and_generic().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
priority,
..
}) => {
assert_eq!(ObjectName(vec![Ident::new("tasks")]), table_name);
assert_eq!(
TableObject::TableName(ObjectName(vec![Ident::new("tasks")])),
table_name
);
assert_eq!(vec![Ident::new("title"), Ident::new("priority")], columns);
assert!(on.is_none());
assert_eq!(priority, Some(HighPriority));
@ -1545,14 +1554,17 @@ fn parse_priority_insert() {
match mysql().verified_stmt(sql2) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
priority,
..
}) => {
assert_eq!(ObjectName(vec![Ident::new("tasks")]), table_name);
assert_eq!(
TableObject::TableName(ObjectName(vec![Ident::new("tasks")])),
table_name
);
assert_eq!(vec![Ident::new("title"), Ident::new("priority")], columns);
assert!(on.is_none());
assert_eq!(priority, Some(LowPriority));
@ -1588,14 +1600,14 @@ fn parse_insert_as() {
let sql = r"INSERT INTO `table` (`date`) VALUES ('2024-01-01') AS `alias`";
match mysql_and_generic().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
insert_alias,
..
}) => {
assert_eq!(
ObjectName(vec![Ident::with_quote('`', "table")]),
TableObject::TableName(ObjectName(vec![Ident::with_quote('`', "table")])),
table_name
);
assert_eq!(vec![Ident::with_quote('`', "date")], columns);
@ -1640,14 +1652,14 @@ fn parse_insert_as() {
let sql = r"INSERT INTO `table` (`id`, `date`) VALUES (1, '2024-01-01') AS `alias` (`mek_id`, `mek_date`)";
match mysql_and_generic().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
insert_alias,
..
}) => {
assert_eq!(
ObjectName(vec![Ident::with_quote('`', "table")]),
TableObject::TableName(ObjectName(vec![Ident::with_quote('`', "table")])),
table_name
);
assert_eq!(
@ -1698,7 +1710,7 @@ fn parse_replace_insert() {
let sql = r"REPLACE DELAYED INTO tasks (title, priority) VALUES ('Test Some Inserts', 1)";
match mysql().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
@ -1706,7 +1718,10 @@ fn parse_replace_insert() {
priority,
..
}) => {
assert_eq!(ObjectName(vec![Ident::new("tasks")]), table_name);
assert_eq!(
TableObject::TableName(ObjectName(vec![Ident::new("tasks")])),
table_name
);
assert_eq!(vec![Ident::new("title"), Ident::new("priority")], columns);
assert!(on.is_none());
assert!(replace_into);
@ -1744,13 +1759,16 @@ fn parse_empty_row_insert() {
match mysql().one_statement_parses_to(sql, "INSERT INTO tb VALUES (), ()") {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
..
}) => {
assert_eq!(ObjectName(vec![Ident::new("tb")]), table_name);
assert_eq!(
TableObject::TableName(ObjectName(vec![Ident::new("tb")])),
table_name
);
assert!(columns.is_empty());
assert!(on.is_none());
assert_eq!(
@ -1783,14 +1801,14 @@ fn parse_insert_with_on_duplicate_update() {
match mysql().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source,
on,
..
}) => {
assert_eq!(
ObjectName(vec![Ident::new("permission_groups")]),
TableObject::TableName(ObjectName(vec![Ident::new("permission_groups")])),
table_name
);
assert_eq!(
@ -1974,12 +1992,12 @@ fn parse_insert_with_numeric_prefix_column_name() {
let sql = "INSERT INTO s1.t1 (123col_$@length123) VALUES (67.654)";
match mysql().verified_stmt(sql) {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
..
}) => {
assert_eq!(
ObjectName(vec![Ident::new("s1"), Ident::new("t1")]),
TableObject::TableName(ObjectName(vec![Ident::new("s1"), Ident::new("t1")])),
table_name
);
assert_eq!(vec![Ident::new("123col_$@length123")], columns);

View file

@ -1725,7 +1725,7 @@ fn parse_prepare() {
};
match sub_stmt.as_ref() {
Statement::Insert(Insert {
table_name,
table: table_name,
columns,
source: Some(source),
..
@ -4381,11 +4381,11 @@ fn test_simple_postgres_insert_with_alias() {
or: None,
ignore: false,
into: true,
table_name: ObjectName(vec![Ident {
table: TableObject::TableName(ObjectName(vec![Ident {
value: "test_tables".to_string(),
quote_style: None,
span: Span::empty(),
}]),
}])),
table_alias: Some(Ident {
value: "test_table".to_string(),
quote_style: None,
@ -4426,7 +4426,7 @@ fn test_simple_postgres_insert_with_alias() {
assignments: vec![],
partitioned: None,
after_columns: vec![],
table: false,
has_table_keyword: false,
on: None,
returning: None,
replace_into: false,
@ -4449,11 +4449,11 @@ fn test_simple_postgres_insert_with_alias() {
or: None,
ignore: false,
into: true,
table_name: ObjectName(vec![Ident {
table: TableObject::TableName(ObjectName(vec![Ident {
value: "test_tables".to_string(),
quote_style: None,
span: Span::empty(),
}]),
}])),
table_alias: Some(Ident {
value: "test_table".to_string(),
quote_style: None,
@ -4497,7 +4497,7 @@ fn test_simple_postgres_insert_with_alias() {
assignments: vec![],
partitioned: None,
after_columns: vec![],
table: false,
has_table_keyword: false,
on: None,
returning: None,
replace_into: false,
@ -4519,11 +4519,11 @@ fn test_simple_insert_with_quoted_alias() {
or: None,
ignore: false,
into: true,
table_name: ObjectName(vec![Ident {
table: TableObject::TableName(ObjectName(vec![Ident {
value: "test_tables".to_string(),
quote_style: None,
span: Span::empty(),
}]),
}])),
table_alias: Some(Ident {
value: "Test_Table".to_string(),
quote_style: Some('"'),
@ -4564,7 +4564,7 @@ fn test_simple_insert_with_quoted_alias() {
assignments: vec![],
partitioned: None,
after_columns: vec![],
table: false,
has_table_keyword: false,
on: None,
returning: None,
replace_into: false,