feat: support export data for bigquery (#1976)
Some checks are pending
license / Release Audit Tool (RAT) (push) Waiting to run
Rust / codestyle (push) Waiting to run
Rust / lint (push) Waiting to run
Rust / benchmark-lint (push) Waiting to run
Rust / compile (push) Waiting to run
Rust / docs (push) Waiting to run
Rust / compile-no-std (push) Waiting to run
Rust / test (beta) (push) Waiting to run
Rust / test (nightly) (push) Waiting to run
Rust / test (stable) (push) Waiting to run

This commit is contained in:
Chen Chongchen 2025-07-28 21:17:51 +08:00 committed by GitHub
parent 5ec953bd78
commit 97a5b61a73
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 309 additions and 2 deletions

View file

@ -4355,6 +4355,15 @@ pub enum Statement {
///
/// See [ReturnStatement]
Return(ReturnStatement),
/// Export data statement
///
/// Example:
/// ```sql
/// EXPORT DATA OPTIONS(uri='gs://bucket/folder/*', format='PARQUET', overwrite=true) AS
/// SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10
/// ```
/// [BigQuery](https://cloud.google.com/bigquery/docs/reference/standard-sql/export-statements)
ExportData(ExportData),
/// ```sql
/// CREATE [OR REPLACE] USER <user> [IF NOT EXISTS]
/// ```
@ -6198,6 +6207,7 @@ impl fmt::Display for Statement {
Statement::Return(r) => write!(f, "{r}"),
Statement::List(command) => write!(f, "LIST {command}"),
Statement::Remove(command) => write!(f, "REMOVE {command}"),
Statement::ExportData(e) => write!(f, "{e}"),
Statement::CreateUser(s) => write!(f, "{s}"),
}
}
@ -10144,6 +10154,34 @@ impl fmt::Display for MemberOf {
}
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct ExportData {
pub options: Vec<SqlOption>,
pub query: Box<Query>,
pub connection: Option<ObjectName>,
}
impl fmt::Display for ExportData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(connection) = &self.connection {
write!(
f,
"EXPORT DATA WITH CONNECTION {connection} OPTIONS({}) AS {}",
display_comma_separated(&self.options),
self.query
)
} else {
write!(
f,
"EXPORT DATA OPTIONS({}) AS {}",
display_comma_separated(&self.options),
self.query
)
}
}
}
/// Creates a user
///
/// Syntax:

View file

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
use crate::ast::{query::SelectItemQualifiedWildcardKind, ColumnOptions};
use crate::ast::{query::SelectItemQualifiedWildcardKind, ColumnOptions, ExportData};
use core::iter;
use crate::tokenizer::Span;
@ -531,6 +531,17 @@ impl Spanned for Statement {
Statement::Print { .. } => Span::empty(),
Statement::Return { .. } => Span::empty(),
Statement::List(..) | Statement::Remove(..) => Span::empty(),
Statement::ExportData(ExportData {
options,
query,
connection,
}) => union_spans(
options
.iter()
.map(|i| i.span())
.chain(core::iter::once(query.span()))
.chain(connection.iter().map(|i| i.span())),
),
Statement::CreateUser(..) => Span::empty(),
}
}

View file

@ -645,6 +645,10 @@ impl<'a> Parser<'a> {
Keyword::COMMENT if self.dialect.supports_comment_on() => self.parse_comment(),
Keyword::PRINT => self.parse_print(),
Keyword::RETURN => self.parse_return(),
Keyword::EXPORT => {
self.prev_token();
self.parse_export_data()
}
_ => self.expected("an SQL statement", next_token),
},
Token::LParen => {
@ -16523,6 +16527,30 @@ impl<'a> Parser<'a> {
}
}
/// /// Parse a `EXPORT DATA` statement.
///
/// See [Statement::ExportData]
fn parse_export_data(&mut self) -> Result<Statement, ParserError> {
self.expect_keywords(&[Keyword::EXPORT, Keyword::DATA])?;
let connection = if self.parse_keywords(&[Keyword::WITH, Keyword::CONNECTION]) {
Some(self.parse_object_name(false)?)
} else {
None
};
self.expect_keyword(Keyword::OPTIONS)?;
self.expect_token(&Token::LParen)?;
let options = self.parse_comma_separated(|p| p.parse_sql_option())?;
self.expect_token(&Token::RParen)?;
self.expect_keyword(Keyword::AS)?;
let query = self.parse_query()?;
Ok(Statement::ExportData(ExportData {
options,
query,
connection,
}))
}
/// Consume the parser and return its underlying token buffer
pub fn into_tokens(self) -> Vec<TokenWithSpan> {
self.tokens

View file

@ -20,10 +20,12 @@ mod test_utils;
use std::ops::Deref;
use sqlparser::ast::helpers::attached_token::AttachedToken;
use sqlparser::ast::*;
use sqlparser::dialect::{BigQueryDialect, GenericDialect};
use sqlparser::keywords::Keyword;
use sqlparser::parser::{ParserError, ParserOptions};
use sqlparser::tokenizer::{Location, Span};
use sqlparser::tokenizer::{Location, Span, Token, TokenWithSpan, Word};
use test_utils::*;
#[test]
@ -2567,6 +2569,234 @@ fn test_struct_trailing_and_nested_bracket() {
);
}
#[test]
fn test_export_data() {
let stmt = bigquery().verified_stmt(concat!(
"EXPORT DATA OPTIONS(",
"uri = 'gs://bucket/folder/*', ",
"format = 'PARQUET', ",
"overwrite = true",
") AS ",
"SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
));
assert_eq!(
stmt,
Statement::ExportData(ExportData {
options: vec![
SqlOption::KeyValue {
key: Ident::new("uri"),
value: Expr::Value(
Value::SingleQuotedString("gs://bucket/folder/*".to_owned())
.with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("format"),
value: Expr::Value(
Value::SingleQuotedString("PARQUET".to_owned()).with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("overwrite"),
value: Expr::Value(Value::Boolean(true).with_empty_span()),
},
],
connection: None,
query: Box::new(Query {
with: None,
body: Box::new(SetExpr::Select(Box::new(Select {
select_token: AttachedToken(TokenWithSpan::new(
Token::Word(Word {
value: "SELECT".to_string(),
quote_style: None,
keyword: Keyword::SELECT,
}),
Span::empty()
)),
distinct: None,
top: None,
top_before_distinct: false,
projection: vec![
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field1"))),
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field2"))),
],
exclude: None,
into: None,
from: vec![TableWithJoins {
relation: table_from_name(ObjectName::from(vec![
Ident::new("mydataset"),
Ident::new("table1")
])),
joins: vec![],
}],
lateral_views: vec![],
prewhere: None,
selection: None,
group_by: GroupByExpr::Expressions(vec![], vec![]),
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
window_before_qualify: false,
value_table_mode: None,
connect_by: None,
flavor: SelectFlavor::Standard,
}))),
order_by: Some(OrderBy {
kind: OrderByKind::Expressions(vec![OrderByExpr {
expr: Expr::Identifier(Ident::new("field1")),
options: OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
},]),
interpolate: None,
}),
limit_clause: Some(LimitClause::LimitOffset {
limit: Some(Expr::Value(number("10").with_empty_span())),
offset: None,
limit_by: vec![],
}),
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
pipe_operators: vec![],
})
})
);
let stmt = bigquery().verified_stmt(concat!(
"EXPORT DATA WITH CONNECTION myconnection.myproject.us OPTIONS(",
"uri = 'gs://bucket/folder/*', ",
"format = 'PARQUET', ",
"overwrite = true",
") AS ",
"SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
));
assert_eq!(
stmt,
Statement::ExportData(ExportData {
options: vec![
SqlOption::KeyValue {
key: Ident::new("uri"),
value: Expr::Value(
Value::SingleQuotedString("gs://bucket/folder/*".to_owned())
.with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("format"),
value: Expr::Value(
Value::SingleQuotedString("PARQUET".to_owned()).with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("overwrite"),
value: Expr::Value(Value::Boolean(true).with_empty_span()),
},
],
connection: Some(ObjectName::from(vec![
Ident::new("myconnection"),
Ident::new("myproject"),
Ident::new("us")
])),
query: Box::new(Query {
with: None,
body: Box::new(SetExpr::Select(Box::new(Select {
select_token: AttachedToken(TokenWithSpan::new(
Token::Word(Word {
value: "SELECT".to_string(),
quote_style: None,
keyword: Keyword::SELECT,
}),
Span::empty()
)),
distinct: None,
top: None,
top_before_distinct: false,
projection: vec![
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field1"))),
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field2"))),
],
exclude: None,
into: None,
from: vec![TableWithJoins {
relation: table_from_name(ObjectName::from(vec![
Ident::new("mydataset"),
Ident::new("table1")
])),
joins: vec![],
}],
lateral_views: vec![],
prewhere: None,
selection: None,
group_by: GroupByExpr::Expressions(vec![], vec![]),
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
window_before_qualify: false,
value_table_mode: None,
connect_by: None,
flavor: SelectFlavor::Standard,
}))),
order_by: Some(OrderBy {
kind: OrderByKind::Expressions(vec![OrderByExpr {
expr: Expr::Identifier(Ident::new("field1")),
options: OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
},]),
interpolate: None,
}),
limit_clause: Some(LimitClause::LimitOffset {
limit: Some(Expr::Value(number("10").with_empty_span())),
offset: None,
limit_by: vec![],
}),
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
pipe_operators: vec![],
})
})
);
// at least one option (uri) is required
let err = bigquery()
.parse_sql_statements(concat!(
"EXPORT DATA OPTIONS() AS ",
"SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
))
.unwrap_err();
assert_eq!(
err.to_string(),
"sql parser error: Expected: identifier, found: )"
);
let err = bigquery()
.parse_sql_statements(concat!(
"EXPORT DATA AS SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
))
.unwrap_err();
assert_eq!(
err.to_string(),
"sql parser error: Expected: OPTIONS, found: AS"
);
}
#[test]
fn test_begin_transaction() {
bigquery().verified_stmt("BEGIN TRANSACTION");