package parquet import ( "bufio" "database/sql" "fmt" "os" "strings" "github.com/anotherhadi/eleakxir/leak-utils/settings" "github.com/charmbracelet/log" ) type Parquet struct { Filepath string Filename string Columns []string Sample [][]string NRows int64 Compression string // Compression of the output file (e.g., "SNAPPY", "ZSTD", "NONE" or "") } type ColumnOperation struct { OriginalName string NewName string Action string // "keep", "rename", "drop" } func (parquet Parquet) PrintParquet() { fmt.Println(settings.Header.Render(parquet.Filename) + "\n") fmt.Println(settings.Accent.Render("File path:"), settings.Base.Render(parquet.Filepath)) fmt.Println(settings.Accent.Render("Number of columns:"), settings.Base.Render(fmt.Sprintf("%d", len(parquet.Columns)))) fmt.Println(settings.Accent.Render("Number of rows:"), settings.Base.Render(formatWithSpaces(parquet.NRows))) fmt.Println() fmt.Println(settings.Accent.Render(strings.Join(parquet.Columns, " | "))) for _, row := range parquet.Sample { fmt.Println(settings.Base.Render(strings.Join(row, " | "))) } } func InfoParquet(lu settings.LeakUtils, inputFile string) error { parquet, err := GetParquet(lu.Db, inputFile) if err != nil { return err } parquet.PrintParquet() return nil } func CleanParquet(lu settings.LeakUtils, inputFile, outputFile string, skipLineFormating, deleteFirstRow, printQuery bool) error { input, err := GetParquet(lu.Db, inputFile) if err != nil { return err } input.PrintParquet() columnOps := configureColumns(*input, skipLineFormating) output := Parquet{ Filepath: outputFile, Compression: lu.Compression, } err = transformParquet(lu, *input, output, columnOps, deleteFirstRow, printQuery) return err } func configureColumns(input Parquet, skipLineFormating bool) []ColumnOperation { reader := bufio.NewReader(os.Stdin) var operations []ColumnOperation fmt.Println() fmt.Println(settings.Base.Render("For each column, choose an action:")) fmt.Println(settings.Base.Render(" [k] Keep")) fmt.Println(settings.Base.Render(" [r] Rename")) fmt.Println(settings.Base.Render(" [d] Drop/Delete")) fmt.Println(settings.Base.Render(" [s] Suggested")) fmt.Println(settings.Base.Render(" [b] Go back")) fmt.Println() for i := 0; i < len(input.Columns); i++ { col := input.Columns[i] suggestion := getSuggestion(col) for { fmt.Println(settings.Muted.Render("\nColumn:"), settings.Accent.Render(col)) if suggestion != "" { fmt.Println(settings.Alert.Render("Suggested action: Rename to '" + suggestion + "'")) } fmt.Print(settings.Base.Render("[k/r/d/s/b]: ")) input, err := reader.ReadString('\n') if err != nil { log.Printf("Error reading input: %v", err) continue } input = strings.TrimSpace(strings.ToLower(input)) op := ColumnOperation{ OriginalName: col, NewName: col, Action: "keep", } switch input { case "b", "back": if i > 0 { i -= 2 if len(operations) > 0 { operations = operations[:len(operations)-1] } fmt.Println(settings.Muted.Render("Going back to the previous column...")) } else { fmt.Println(settings.Muted.Render("Already at the first column, cannot go back further.")) continue } goto nextColumn case "r", "rename": fmt.Print(settings.Base.Render("Enter new name: ")) newName, err := reader.ReadString('\n') if err != nil { log.Printf("Error reading new name: %v", err) continue } newName = strings.TrimSpace(newName) if newName != "" { op.OriginalName = "\"" + op.OriginalName + "\"" op.NewName = formatColumnName(newName) op.Action = "rename" operations = append(operations, op) goto nextColumn } else { fmt.Println(settings.Muted.Render("Invalid name, please try again.")) continue } case "s", "suggested": if suggestion != "" { op.OriginalName = "\"" + op.OriginalName + "\"" op.NewName = formatColumnName(suggestion) op.Action = "rename" } else { fmt.Println(settings.Muted.Render("No valid suggestion available")) continue } operations = append(operations, op) goto nextColumn case "d", "drop", "delete": op.Action = "drop" operations = append(operations, op) goto nextColumn case "k", "keep", "": op.OriginalName = "\"" + op.OriginalName + "\"" if input == "" && suggestion != "" { op.NewName = formatColumnName(suggestion) } else { op.NewName = formatColumnName(op.NewName) } op.Action = "rename" operations = append(operations, op) goto nextColumn default: fmt.Println(settings.Muted.Render("Invalid choice, please enter [k/r/d/s/b].")) continue } } nextColumn: lastOp := operations[len(operations)-1] switch lastOp.Action { case "rename": if formatColumnName(lastOp.OriginalName) == lastOp.NewName { fmt.Printf(settings.Muted.Render("Keeping column '%s' as is.\n"), lastOp.OriginalName) } else { fmt.Printf(settings.Muted.Render("Renaming column '%s' to '%s'.\n"), lastOp.OriginalName, lastOp.NewName) } case "drop": fmt.Printf(settings.Muted.Render("Dropping column '%s'.\n"), lastOp.OriginalName) } } if !skipLineFormating { operations = formatColumns(operations) } return operations } func transformParquet(lu settings.LeakUtils, input, output Parquet, operations []ColumnOperation, deleteFirstRow, printQuery bool) error { var selectClauses []string hasColumns := false for _, op := range operations { if op.Action != "drop" { hasColumns = true if op.Action == "rename" { selectClauses = append(selectClauses, fmt.Sprintf("%s AS \"%s\"", op.OriginalName, op.NewName)) } else { selectClauses = append(selectClauses, op.OriginalName) } } } if !hasColumns { return fmt.Errorf("no columns selected for output") } selectClause := strings.Join(selectClauses, ", ") compression := "" if output.Compression != "" { compression = ", COMPRESSION '" + output.Compression + "'" } columnsLength := []string{} for _, col := range input.Columns { columnsLength = append(columnsLength, "COALESCE(LENGTH(\""+col+"\"),0)") } allowedRowSize := 30 * len(input.Columns) offset := "" if deleteFirstRow { offset = "OFFSET 1" } query := fmt.Sprintf(` COPY ( SELECT %s FROM read_parquet('%s') WHERE (%s) < %d %s ) TO '%s' (FORMAT PARQUET, ROW_GROUP_SIZE 200_000 %s) `, selectClause, input.Filepath, strings.Join(columnsLength, "+"), allowedRowSize, offset, output.Filepath, compression) if printQuery { fmt.Println(settings.Base.Render("\nQuery:")) fmt.Println(settings.Accent.Render(strings.ReplaceAll(strings.TrimSpace(query), "\t", ""))) return nil } if lu.Debug { fmt.Println(settings.Base.Render("\nQuery:")) fmt.Println(settings.Accent.Render(strings.ReplaceAll(strings.TrimSpace(query), "\t", ""))) } fmt.Println(settings.Base.Render("\nTransforming and writing to output parquet...")) _, err := lu.Db.Exec(query) if err != nil { return fmt.Errorf("failed to execute transformation: %w", err) } fmt.Println(settings.Base.Render("Transformation complete!\n")) newParquet, err := GetParquet(lu.Db, output.Filepath) if err != nil { return err } newParquet.PrintParquet() return nil } func GetParquet(db *sql.DB, inputFile string) (parquet *Parquet, err error) { parquet = &Parquet{} parquet.Filepath = inputFile parquet.Columns, err = GetColumns(db, inputFile) if err != nil { return } parquet.NRows, err = countRows(db, inputFile) if err != nil { return } parquet.Sample, err = getFirstNRows(db, inputFile, 6) if err != nil { return } n := strings.LastIndex(inputFile, "/") if n == -1 { parquet.Filename = inputFile } else { parquet.Filename = inputFile[n+1:] } return }