284 lines
7.7 KiB
Go
284 lines
7.7 KiB
Go
package parquet
|
|
|
|
import (
|
|
"bufio"
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/anotherhadi/eleakxir/leak-utils/settings"
|
|
"github.com/charmbracelet/log"
|
|
)
|
|
|
|
type Parquet struct {
|
|
Filepath string
|
|
Filename string
|
|
Columns []string
|
|
Sample [][]string
|
|
NRows int64
|
|
Compression string // Compression of the output file (e.g., "SNAPPY", "ZSTD", "NONE" or "")
|
|
}
|
|
|
|
type ColumnOperation struct {
|
|
OriginalName string
|
|
NewName string
|
|
Action string // "keep", "rename", "drop"
|
|
}
|
|
|
|
func (parquet Parquet) PrintParquet() {
|
|
fmt.Println(settings.Header.Render(parquet.Filename) + "\n")
|
|
fmt.Println(settings.Accent.Render("File path:"), settings.Base.Render(parquet.Filepath))
|
|
fmt.Println(settings.Accent.Render("Number of columns:"), settings.Base.Render(fmt.Sprintf("%d", len(parquet.Columns))))
|
|
fmt.Println(settings.Accent.Render("Number of rows:"), settings.Base.Render(formatWithSpaces(parquet.NRows)))
|
|
fmt.Println()
|
|
fmt.Println(settings.Accent.Render(strings.Join(parquet.Columns, " | ")))
|
|
for _, row := range parquet.Sample {
|
|
fmt.Println(settings.Base.Render(strings.Join(row, " | ")))
|
|
}
|
|
}
|
|
|
|
func InfoParquet(lu settings.LeakUtils, inputFile string) error {
|
|
parquet, err := GetParquet(lu.Db, inputFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
parquet.PrintParquet()
|
|
return nil
|
|
}
|
|
|
|
func CleanParquet(lu settings.LeakUtils, inputFile, outputFile string, skipLineFormating, deleteFirstRow, printQuery bool) error {
|
|
input, err := GetParquet(lu.Db, inputFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
input.PrintParquet()
|
|
columnOps := configureColumns(*input, skipLineFormating)
|
|
output := Parquet{
|
|
Filepath: outputFile,
|
|
Compression: lu.Compression,
|
|
}
|
|
err = transformParquet(lu, *input, output, columnOps, deleteFirstRow, printQuery)
|
|
return err
|
|
}
|
|
|
|
func configureColumns(input Parquet, skipLineFormating bool) []ColumnOperation {
|
|
reader := bufio.NewReader(os.Stdin)
|
|
var operations []ColumnOperation
|
|
|
|
fmt.Println()
|
|
fmt.Println(settings.Base.Render("For each column, choose an action:"))
|
|
fmt.Println(settings.Base.Render(" [k] Keep"))
|
|
fmt.Println(settings.Base.Render(" [r] Rename"))
|
|
fmt.Println(settings.Base.Render(" [d] Drop/Delete"))
|
|
fmt.Println(settings.Base.Render(" [s] Suggested"))
|
|
fmt.Println(settings.Base.Render(" [b] Go back"))
|
|
fmt.Println()
|
|
|
|
for i := 0; i < len(input.Columns); i++ {
|
|
col := input.Columns[i]
|
|
suggestion := getSuggestion(col)
|
|
|
|
for {
|
|
fmt.Println(settings.Muted.Render("\nColumn:"), settings.Accent.Render(col))
|
|
if suggestion != "" {
|
|
fmt.Println(settings.Alert.Render("Suggested action: Rename to '" + suggestion + "'"))
|
|
}
|
|
fmt.Print(settings.Base.Render("[k/r/d/s/b]: "))
|
|
|
|
input, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
log.Printf("Error reading input: %v", err)
|
|
continue
|
|
}
|
|
input = strings.TrimSpace(strings.ToLower(input))
|
|
|
|
op := ColumnOperation{
|
|
OriginalName: col,
|
|
NewName: col,
|
|
Action: "keep",
|
|
}
|
|
|
|
switch input {
|
|
case "b", "back":
|
|
if i > 0 {
|
|
i -= 2
|
|
if len(operations) > 0 {
|
|
operations = operations[:len(operations)-1]
|
|
}
|
|
fmt.Println(settings.Muted.Render("Going back to the previous column..."))
|
|
} else {
|
|
fmt.Println(settings.Muted.Render("Already at the first column, cannot go back further."))
|
|
continue
|
|
}
|
|
goto nextColumn
|
|
|
|
case "r", "rename":
|
|
fmt.Print(settings.Base.Render("Enter new name: "))
|
|
newName, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
log.Printf("Error reading new name: %v", err)
|
|
continue
|
|
}
|
|
newName = strings.TrimSpace(newName)
|
|
if newName != "" {
|
|
op.OriginalName = "\"" + op.OriginalName + "\""
|
|
op.NewName = formatColumnName(newName)
|
|
op.Action = "rename"
|
|
operations = append(operations, op)
|
|
goto nextColumn
|
|
} else {
|
|
fmt.Println(settings.Muted.Render("Invalid name, please try again."))
|
|
continue
|
|
}
|
|
|
|
case "s", "suggested":
|
|
if suggestion != "" {
|
|
op.OriginalName = "\"" + op.OriginalName + "\""
|
|
op.NewName = formatColumnName(suggestion)
|
|
op.Action = "rename"
|
|
} else {
|
|
fmt.Println(settings.Muted.Render("No valid suggestion available"))
|
|
continue
|
|
}
|
|
operations = append(operations, op)
|
|
goto nextColumn
|
|
|
|
case "d", "drop", "delete":
|
|
op.Action = "drop"
|
|
operations = append(operations, op)
|
|
goto nextColumn
|
|
|
|
case "k", "keep", "":
|
|
op.OriginalName = "\"" + op.OriginalName + "\""
|
|
if input == "" && suggestion != "" {
|
|
op.NewName = formatColumnName(suggestion)
|
|
} else {
|
|
op.NewName = formatColumnName(op.NewName)
|
|
}
|
|
op.Action = "rename"
|
|
operations = append(operations, op)
|
|
goto nextColumn
|
|
|
|
default:
|
|
fmt.Println(settings.Muted.Render("Invalid choice, please enter [k/r/d/s/b]."))
|
|
continue
|
|
}
|
|
}
|
|
nextColumn:
|
|
lastOp := operations[len(operations)-1]
|
|
switch lastOp.Action {
|
|
case "rename":
|
|
if formatColumnName(lastOp.OriginalName) == lastOp.NewName {
|
|
fmt.Printf(settings.Muted.Render("Keeping column '%s' as is.\n"), lastOp.OriginalName)
|
|
} else {
|
|
fmt.Printf(settings.Muted.Render("Renaming column '%s' to '%s'.\n"), lastOp.OriginalName, lastOp.NewName)
|
|
}
|
|
case "drop":
|
|
fmt.Printf(settings.Muted.Render("Dropping column '%s'.\n"), lastOp.OriginalName)
|
|
}
|
|
}
|
|
if !skipLineFormating {
|
|
operations = formatColumns(operations)
|
|
}
|
|
|
|
return operations
|
|
}
|
|
|
|
func transformParquet(lu settings.LeakUtils, input, output Parquet, operations []ColumnOperation, deleteFirstRow, printQuery bool) error {
|
|
var selectClauses []string
|
|
hasColumns := false
|
|
|
|
for _, op := range operations {
|
|
if op.Action != "drop" {
|
|
hasColumns = true
|
|
if op.Action == "rename" {
|
|
selectClauses = append(selectClauses, fmt.Sprintf("%s AS \"%s\"", op.OriginalName, op.NewName))
|
|
} else {
|
|
selectClauses = append(selectClauses, op.OriginalName)
|
|
}
|
|
}
|
|
}
|
|
|
|
if !hasColumns {
|
|
return fmt.Errorf("no columns selected for output")
|
|
}
|
|
|
|
selectClause := strings.Join(selectClauses, ", ")
|
|
compression := ""
|
|
if output.Compression != "" {
|
|
compression = ", COMPRESSION '" + output.Compression + "'"
|
|
}
|
|
|
|
columnsLength := []string{}
|
|
for _, col := range input.Columns {
|
|
columnsLength = append(columnsLength, "COALESCE(LENGTH(\""+col+"\"),0)")
|
|
}
|
|
allowedRowSize := 30 * len(input.Columns)
|
|
offset := ""
|
|
if deleteFirstRow {
|
|
offset = "OFFSET 1"
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
COPY (
|
|
SELECT %s
|
|
FROM read_parquet('%s')
|
|
WHERE (%s) < %d %s
|
|
) TO '%s' (FORMAT PARQUET, ROW_GROUP_SIZE 200_000 %s)
|
|
`, selectClause, input.Filepath, strings.Join(columnsLength, "+"), allowedRowSize, offset, output.Filepath, compression)
|
|
|
|
if printQuery {
|
|
fmt.Println(settings.Base.Render("\nQuery:"))
|
|
fmt.Println(settings.Accent.Render(strings.ReplaceAll(strings.TrimSpace(query), "\t", "")))
|
|
return nil
|
|
}
|
|
if lu.Debug {
|
|
fmt.Println(settings.Base.Render("\nQuery:"))
|
|
fmt.Println(settings.Accent.Render(strings.ReplaceAll(strings.TrimSpace(query), "\t", "")))
|
|
}
|
|
|
|
fmt.Println(settings.Base.Render("\nTransforming and writing to output parquet..."))
|
|
_, err := lu.Db.Exec(query)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute transformation: %w", err)
|
|
}
|
|
fmt.Println(settings.Base.Render("Transformation complete!\n"))
|
|
|
|
newParquet, err := GetParquet(lu.Db, output.Filepath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newParquet.PrintParquet()
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetParquet(db *sql.DB, inputFile string) (parquet *Parquet, err error) {
|
|
parquet = &Parquet{}
|
|
parquet.Filepath = inputFile
|
|
|
|
parquet.Columns, err = GetColumns(db, inputFile)
|
|
if err != nil {
|
|
return
|
|
}
|
|
parquet.NRows, err = countRows(db, inputFile)
|
|
if err != nil {
|
|
return
|
|
}
|
|
parquet.Sample, err = getFirstNRows(db, inputFile, 6)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
n := strings.LastIndex(inputFile, "/")
|
|
if n == -1 {
|
|
parquet.Filename = inputFile
|
|
} else {
|
|
parquet.Filename = inputFile[n+1:]
|
|
}
|
|
|
|
return
|
|
}
|