Signed-off-by: Hadi <112569860+anotherhadi@users.noreply.github.com>
This commit is contained in:
Hadi
2025-09-27 21:36:02 +02:00
parent f588ab0f04
commit 0382b6b3bb

View File

@@ -189,26 +189,16 @@ func configureColumns(input Parquet, skipLineFormating bool) []ColumnOperation {
func transformParquet(lu settings.LeakUtils, input, output Parquet, operations []ColumnOperation, deleteFirstRow, printQuery bool) error { func transformParquet(lu settings.LeakUtils, input, output Parquet, operations []ColumnOperation, deleteFirstRow, printQuery bool) error {
var selectClauses []string var selectClauses []string
var columnsLength []string
hasColumns := false hasColumns := false
for _, op := range operations { for _, op := range operations {
escapedOriginalName := escapeColumnName(op.OriginalName)
if op.Action != "drop" { if op.Action != "drop" {
hasColumns = true hasColumns = true
originalSelectName := op.OriginalName
if op.Action == "rename" { if op.Action == "rename" {
originalSelectName = op.OriginalName selectClauses = append(selectClauses, fmt.Sprintf("%s AS \"%s\"", op.OriginalName, op.NewName))
selectClauses = append(selectClauses, fmt.Sprintf("%s AS \"%s\"", originalSelectName, op.NewName))
} else { } else {
selectClauses = append(selectClauses, originalSelectName) selectClauses = append(selectClauses, op.OriginalName)
} }
columnsLength = append(columnsLength, fmt.Sprintf("COALESCE(LENGTH(\"%s\"),0)", escapedOriginalName))
} else {
columnsLength = append(columnsLength, fmt.Sprintf("COALESCE(LENGTH(\"%s\"),0)", escapedOriginalName))
} }
} }
@@ -222,6 +212,10 @@ func transformParquet(lu settings.LeakUtils, input, output Parquet, operations [
compression = ", COMPRESSION '" + output.Compression + "'" compression = ", COMPRESSION '" + output.Compression + "'"
} }
columnsLength := []string{}
for _, col := range input.Columns {
columnsLength = append(columnsLength, "COALESCE(LENGTH(\""+col+"\"),0)")
}
allowedRowSize := 30 * len(input.Columns) allowedRowSize := 30 * len(input.Columns)
offset := "" offset := ""
if deleteFirstRow { if deleteFirstRow {
@@ -230,7 +224,7 @@ func transformParquet(lu settings.LeakUtils, input, output Parquet, operations [
query := fmt.Sprintf(` query := fmt.Sprintf(`
COPY ( COPY (
SELECT %s  SELECT %s
FROM read_parquet('%s') FROM read_parquet('%s')
WHERE (%s) < %d %s WHERE (%s) < %d %s
) TO '%s' (FORMAT PARQUET, ROW_GROUP_SIZE 200_000 %s) ) TO '%s' (FORMAT PARQUET, ROW_GROUP_SIZE 200_000 %s)
@@ -288,7 +282,3 @@ func GetParquet(db *sql.DB, inputFile string) (parquet *Parquet, err error) {
return return
} }
func escapeColumnName(name string) string {
return strings.ReplaceAll(name, "\"", "\"\"")
}