From 224454bf51db8147fc13b3f8318c39a0931e225e Mon Sep 17 00:00:00 2001 From: Hadi <112569860+anotherhadi@users.noreply.github.com> Date: Wed, 24 Sep 2025 22:51:10 +0200 Subject: [PATCH] new action: jsonToParquet (#2) Signed-off-by: Hadi <112569860+anotherhadi@users.noreply.github.com> --- leak-utils/leak-utils/main.go | 22 +++++++ leak-utils/misc/json.go | 107 ++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 leak-utils/misc/json.go diff --git a/leak-utils/leak-utils/main.go b/leak-utils/leak-utils/main.go index f8b395d..f36606f 100644 --- a/leak-utils/leak-utils/main.go +++ b/leak-utils/leak-utils/main.go @@ -29,6 +29,8 @@ func main() { "infoParquet", // Csv "csvToParquet", + // JSON + "jsonToParquet", // Misc "mergeFiles", "deleteFirstLines", @@ -109,6 +111,26 @@ func main() { log.Fatal("Failed to transform Csv file", "error", err) } return + case "jsonToParquet": + var inputFile *string = flag.StringP("input", "i", "", "Input Parquet file") + var outputFile *string = flag.StringP("output", "o", "", "Output Parquet file") + var compression *string = flag.StringP("compression", "c", "ZSTD", "Compression codec (UNCOMPRESSED, SNAPPY, GZIP, BROTLI, LZ4, ZSTD)") + var noColors *bool = flag.Bool("no-colors", false, "Remove all colors") + var debug *bool = flag.Bool("debug", false, "Debug mode") + flag.Parse() + if *inputFile == "" || *outputFile == "" { + log.Fatal("Input and output files are required") + } + if *noColors { + settings.DisableColors() + } + lu.Compression = *compression + lu.Debug = *debug + err := misc.JsonToParquet(lu, *inputFile, *outputFile) + if err != nil { + log.Fatal("Failed to transform JSON file", "error", err) + } + return case "mergeFiles": var inputFiles *[]string = flag.StringArrayP("inputs", "i", []string{}, "Input Parquet files") var outputFile *string = flag.StringP("output", "o", "", "Output Parquet file") diff --git a/leak-utils/misc/json.go b/leak-utils/misc/json.go new file mode 100644 index 0000000..b2c8a62 --- /dev/null +++ b/leak-utils/misc/json.go @@ -0,0 +1,107 @@ +package misc + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/anotherhadi/eleakxir/leak-utils/settings" +) + +func flattenJSON(prefix string, in map[string]any, out map[string]any) { + for k, v := range in { + key := k + if prefix != "" { + key = prefix + "." + k + } + switch child := v.(type) { + case map[string]any: + flattenJSON(key, child, out) + case []any: + for i, item := range child { + tempMap := make(map[string]any) + tempMap[fmt.Sprintf("%d", i)] = item + flattenJSON(key, tempMap, out) + } + default: + out[key] = fmt.Sprintf("%v", child) + } + } +} + +func flattenJSONFile(inputFile string, outputFile string) error { + in, err := os.Open(inputFile) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(outputFile) + if err != nil { + return err + } + defer out.Close() + + scanner := bufio.NewScanner(in) + writer := bufio.NewWriter(out) + defer writer.Flush() + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + if strings.HasPrefix(line, "[") { + var arr []map[string]any + if err := json.Unmarshal([]byte(line), &arr); err != nil { + return fmt.Errorf("invalid JSON array: %w", err) + } + for _, obj := range arr { + flat := make(map[string]any) + flattenJSON("", obj, flat) + b, err := json.Marshal(flat) + if err != nil { + return fmt.Errorf("failed to marshal flattened JSON: %w", err) + } + writer.Write(b) + writer.WriteString("\n") + } + } else { + var obj map[string]any + if err := json.Unmarshal([]byte(line), &obj); err != nil { + return fmt.Errorf("invalid JSON object: %w", err) + } + flat := make(map[string]any) + flattenJSON("", obj, flat) + b, err := json.Marshal(flat) + if err != nil { + return fmt.Errorf("failed to marshal flattened JSON: %w", err) + } + writer.Write(b) + writer.WriteString("\n") + } + } + if err := scanner.Err(); err != nil { + return err + } + return nil +} + +func JsonToParquet(lu settings.LeakUtils, inputFile string, outputFile string) error { + tmpFile := filepath.Join("/tmp", "leak-utils.flat.json") + err := flattenJSONFile(inputFile, tmpFile) + defer os.Remove(tmpFile) + + query := fmt.Sprintf(`COPY (FROM read_json('%s', union_by_name=true)) TO '%s' (FORMAT 'parquet', COMPRESSION '%s', ROW_GROUP_SIZE 200000);`, tmpFile, outputFile, lu.Compression) + + _, err = lu.Db.Exec(query) + if err != nil { + return fmt.Errorf("duckdb copy error: %w", err) + } + + return nil +}