mirror of
https://github.com/apache/datafusion-sqlparser-rs.git
synced 2025-12-23 11:12:51 +00:00
Replaced CSV with custom csv parser
This commit is contained in:
parent
5120d8c051
commit
07a828f661
4 changed files with 182 additions and 60 deletions
|
|
@ -47,7 +47,6 @@ visitor = ["sqlparser_derive"]
|
|||
[dependencies]
|
||||
bigdecimal = { version = "0.4.1", features = ["serde"], optional = true }
|
||||
log = "0.4"
|
||||
csv = "1.4.0"
|
||||
recursive = { version = "0.1.1", optional = true}
|
||||
|
||||
serde = { version = "1.0", default-features = false, features = ["derive", "alloc"], optional = true }
|
||||
|
|
|
|||
|
|
@ -4576,19 +4576,21 @@ impl fmt::Display for Statement {
|
|||
}
|
||||
|
||||
let mut null_symbol = "\\N";
|
||||
let mut writer_builder = csv::WriterBuilder::new();
|
||||
let mut delimiter = '\t';
|
||||
let mut quote = '"';
|
||||
let mut escape = '\\';
|
||||
|
||||
// Apply options
|
||||
for option in options {
|
||||
match option {
|
||||
CopyOption::Delimiter(c) => {
|
||||
writer_builder.delimiter(*c as u8);
|
||||
delimiter = *c;
|
||||
}
|
||||
CopyOption::Quote(c) => {
|
||||
writer_builder.quote(*c as u8);
|
||||
quote = *c;
|
||||
}
|
||||
CopyOption::Escape(c) => {
|
||||
writer_builder.escape(*c as u8);
|
||||
escape = *c;
|
||||
}
|
||||
CopyOption::Null(null) => {
|
||||
null_symbol = null;
|
||||
|
|
@ -4601,10 +4603,7 @@ impl fmt::Display for Statement {
|
|||
for option in legacy_options {
|
||||
match option {
|
||||
CopyLegacyOption::Delimiter(c) => {
|
||||
writer_builder.delimiter(*c as u8);
|
||||
}
|
||||
CopyLegacyOption::Header => {
|
||||
writer_builder.has_headers(true);
|
||||
delimiter = *c;
|
||||
}
|
||||
CopyLegacyOption::Null(null) => {
|
||||
null_symbol = null;
|
||||
|
|
@ -4612,14 +4611,11 @@ impl fmt::Display for Statement {
|
|||
CopyLegacyOption::Csv(csv_options) => {
|
||||
for csv_option in csv_options {
|
||||
match csv_option {
|
||||
CopyLegacyCsvOption::Header => {
|
||||
writer_builder.has_headers(true);
|
||||
}
|
||||
CopyLegacyCsvOption::Quote(c) => {
|
||||
writer_builder.quote(*c as u8);
|
||||
quote = *c;
|
||||
}
|
||||
CopyLegacyCsvOption::Escape(c) => {
|
||||
writer_builder.escape(*c as u8);
|
||||
escape = *c;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
|
@ -4631,19 +4627,43 @@ impl fmt::Display for Statement {
|
|||
|
||||
if !values.is_empty() {
|
||||
writeln!(f, ";")?;
|
||||
let mut writer = writer_builder.from_writer(vec![]);
|
||||
|
||||
// Simple CSV writer
|
||||
for row in values {
|
||||
writer
|
||||
.write_record(
|
||||
row.iter()
|
||||
.map(|column| column.as_deref().unwrap_or(null_symbol)),
|
||||
)
|
||||
.map_err(|_| fmt::Error)?
|
||||
for (idx, column) in row.iter().enumerate() {
|
||||
if idx > 0 {
|
||||
write!(f, "{}", delimiter)?;
|
||||
}
|
||||
|
||||
let field_value = column.as_deref().unwrap_or(null_symbol);
|
||||
|
||||
// Check if field needs quoting
|
||||
let needs_quoting = field_value.contains(delimiter)
|
||||
|| field_value.contains(quote)
|
||||
|| field_value.contains('\n')
|
||||
|| field_value.contains('\r');
|
||||
|
||||
if needs_quoting {
|
||||
write!(f, "{}", quote)?;
|
||||
for ch in field_value.chars() {
|
||||
if ch == quote {
|
||||
// Escape quote by doubling it
|
||||
write!(f, "{}{}", quote, quote)?;
|
||||
} else if ch == escape {
|
||||
// Escape escape character
|
||||
write!(f, "{}{}", escape, escape)?;
|
||||
} else {
|
||||
write!(f, "{}", ch)?;
|
||||
}
|
||||
}
|
||||
write!(f, "{}", quote)?;
|
||||
} else {
|
||||
write!(f, "{}", field_value)?;
|
||||
}
|
||||
}
|
||||
writeln!(f)?;
|
||||
}
|
||||
writer.flush().map_err(|_| fmt::Error)?;
|
||||
let data = String::from_utf8(writer.into_inner().map_err(|_| fmt::Error)?)
|
||||
.map_err(|_| fmt::Error)?;
|
||||
write!(f, "{}", data)?;
|
||||
|
||||
write!(f, "\\.")?;
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -9542,25 +9542,22 @@ impl<'a> Parser<'a> {
|
|||
return self.expected("COPY ... FROM STDIN with CSV body", self.peek_token());
|
||||
};
|
||||
|
||||
let mut reader_builder = csv::ReaderBuilder::new();
|
||||
reader_builder.has_headers(false);
|
||||
|
||||
let mut delimiter = '\t';
|
||||
let mut quote = '"';
|
||||
let mut escape = '\\';
|
||||
let mut null_symbol = "\\N";
|
||||
|
||||
// Apply options
|
||||
for option in options {
|
||||
match option {
|
||||
CopyOption::Delimiter(c) => {
|
||||
reader_builder.delimiter(*c as u8);
|
||||
}
|
||||
CopyOption::Header(has_header) => {
|
||||
reader_builder.has_headers(*has_header);
|
||||
delimiter = *c;
|
||||
}
|
||||
CopyOption::Quote(c) => {
|
||||
reader_builder.quote(*c as u8);
|
||||
quote = *c;
|
||||
}
|
||||
CopyOption::Escape(c) => {
|
||||
reader_builder.escape(Some(*c as u8));
|
||||
escape = *c;
|
||||
}
|
||||
CopyOption::Null(null) => {
|
||||
null_symbol = null;
|
||||
|
|
@ -9573,10 +9570,7 @@ impl<'a> Parser<'a> {
|
|||
for option in legacy_options {
|
||||
match option {
|
||||
CopyLegacyOption::Delimiter(c) => {
|
||||
reader_builder.delimiter(*c as u8);
|
||||
}
|
||||
CopyLegacyOption::Header => {
|
||||
reader_builder.has_headers(true);
|
||||
delimiter = *c;
|
||||
}
|
||||
CopyLegacyOption::Null(null) => {
|
||||
null_symbol = null;
|
||||
|
|
@ -9584,14 +9578,11 @@ impl<'a> Parser<'a> {
|
|||
CopyLegacyOption::Csv(csv_options) => {
|
||||
for csv_option in csv_options {
|
||||
match csv_option {
|
||||
CopyLegacyCsvOption::Header => {
|
||||
reader_builder.has_headers(true);
|
||||
}
|
||||
CopyLegacyCsvOption::Quote(c) => {
|
||||
reader_builder.quote(*c as u8);
|
||||
quote = *c;
|
||||
}
|
||||
CopyLegacyCsvOption::Escape(c) => {
|
||||
reader_builder.escape(Some(*c as u8));
|
||||
escape = *c;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
|
@ -9601,28 +9592,116 @@ impl<'a> Parser<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
// Simple CSV parser
|
||||
let mut result = vec![];
|
||||
let mut reader = reader_builder.from_reader(body.as_bytes());
|
||||
for record in reader.records() {
|
||||
let record = match record {
|
||||
Ok(rec) => rec,
|
||||
Err(e) => {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"Error parsing CSV data: {}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
};
|
||||
let mut row = vec![];
|
||||
for field in record.iter() {
|
||||
if field == null_symbol {
|
||||
row.push(None);
|
||||
let mut current_row = vec![];
|
||||
let mut current_field = String::new();
|
||||
let mut in_quotes = false;
|
||||
let mut chars = body.chars().peekable();
|
||||
let mut expected_column_count: Option<usize> = None;
|
||||
let mut row_number = 0;
|
||||
|
||||
while let Some(ch) = chars.next() {
|
||||
if in_quotes {
|
||||
if ch == quote {
|
||||
// Check if it's an escaped quote
|
||||
if let Some(&next_ch) = chars.peek() {
|
||||
if next_ch == quote {
|
||||
// Escaped quote
|
||||
current_field.push(quote);
|
||||
chars.next();
|
||||
} else {
|
||||
// End of quoted field
|
||||
in_quotes = false;
|
||||
}
|
||||
} else {
|
||||
// End of quoted field at end of input
|
||||
in_quotes = false;
|
||||
}
|
||||
} else if ch == escape {
|
||||
// Escape character
|
||||
if let Some(next_ch) = chars.next() {
|
||||
current_field.push(next_ch);
|
||||
}
|
||||
} else {
|
||||
row.push(Some(field.to_string()));
|
||||
current_field.push(ch);
|
||||
}
|
||||
} else if ch == quote {
|
||||
in_quotes = true;
|
||||
} else if ch == delimiter {
|
||||
// End of field
|
||||
if current_field == null_symbol {
|
||||
current_row.push(None);
|
||||
} else {
|
||||
current_row.push(Some(current_field.clone()));
|
||||
}
|
||||
current_field.clear();
|
||||
} else if ch == '\n' || ch == '\r' {
|
||||
// End of record
|
||||
if ch == '\r' {
|
||||
// Skip \n if it follows \r
|
||||
if let Some(&'\n') = chars.peek() {
|
||||
chars.next();
|
||||
}
|
||||
}
|
||||
if !current_field.is_empty() || !current_row.is_empty() {
|
||||
if current_field == null_symbol {
|
||||
current_row.push(None);
|
||||
} else {
|
||||
current_row.push(Some(current_field.clone()));
|
||||
}
|
||||
current_field.clear();
|
||||
|
||||
// Validate column count
|
||||
row_number += 1;
|
||||
if let Some(expected) = expected_column_count {
|
||||
if current_row.len() != expected {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"CSV row {} has {} columns, but expected {} columns based on first row",
|
||||
row_number,
|
||||
current_row.len(),
|
||||
expected
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
// First row establishes the expected column count
|
||||
expected_column_count = Some(current_row.len());
|
||||
}
|
||||
|
||||
result.push(current_row.clone());
|
||||
current_row.clear();
|
||||
}
|
||||
} else {
|
||||
current_field.push(ch);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle remaining field/row
|
||||
if !current_field.is_empty() || !current_row.is_empty() {
|
||||
if current_field == null_symbol {
|
||||
current_row.push(None);
|
||||
} else {
|
||||
current_row.push(Some(current_field));
|
||||
}
|
||||
|
||||
// Validate column count for last row
|
||||
row_number += 1;
|
||||
if let Some(expected) = expected_column_count {
|
||||
if current_row.len() != expected {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"CSV row {} has {} columns, but expected {} columns based on first row",
|
||||
row_number,
|
||||
current_row.len(),
|
||||
expected
|
||||
)));
|
||||
}
|
||||
}
|
||||
result.push(row);
|
||||
// Note: if this is the first and only row, we don't need to set expected_column_count
|
||||
// since there's nothing to validate against
|
||||
|
||||
result.push(current_row);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1045,6 +1045,30 @@ fn parse_copy_from_stdin() {
|
|||
12,KARL,BERRY,2017-11-02 19:15:42.308637+08,11.001
|
||||
\."#;
|
||||
pg_and_generic().verified_stmt(sql_comma_separated);
|
||||
|
||||
let incorrect_csv_sql = r#"COPY public.actor (actor_id, first_name, last_name, last_update, value) FROM STDIN (FORMAT csv, DELIMITER ',');
|
||||
1,PENELOPE,GUINESS,2006-02-15 09:34:33,0.11111
|
||||
2,NICK,WAHLBERG,2006-02-15 09:34:33
|
||||
\."#;
|
||||
let parsed = pg_and_generic().parse_sql_statements(incorrect_csv_sql);
|
||||
assert_eq!(
|
||||
parsed.unwrap_err(),
|
||||
ParserError::ParserError(
|
||||
"CSV row 2 has 4 columns, but expected 5 columns based on first row".to_string()
|
||||
)
|
||||
);
|
||||
|
||||
let mixed_incorrect_separators = r#"COPY public.actor (actor_id, first_name, last_name, last_update, value) FROM STDIN (FORMAT csv, DELIMITER ',');
|
||||
1,PENELOPE,GUINESS,2006-02-15 09:34:33,0.11111
|
||||
2 NICK WAHLBERG 2006-02-15 09:34:33,0.22222
|
||||
\."#;
|
||||
let parsed = pg_and_generic().parse_sql_statements(mixed_incorrect_separators);
|
||||
assert_eq!(
|
||||
parsed.unwrap_err(),
|
||||
ParserError::ParserError(
|
||||
"CSV row 2 has 2 columns, but expected 5 columns based on first row".to_string()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue