new action: jsonToParquet (#2)
Signed-off-by: Hadi <112569860+anotherhadi@users.noreply.github.com>
This commit is contained in:
@@ -29,6 +29,8 @@ func main() {
|
||||
"infoParquet",
|
||||
// Csv
|
||||
"csvToParquet",
|
||||
// JSON
|
||||
"jsonToParquet",
|
||||
// Misc
|
||||
"mergeFiles",
|
||||
"deleteFirstLines",
|
||||
@@ -109,6 +111,26 @@ func main() {
|
||||
log.Fatal("Failed to transform Csv file", "error", err)
|
||||
}
|
||||
return
|
||||
case "jsonToParquet":
|
||||
var inputFile *string = flag.StringP("input", "i", "", "Input Parquet file")
|
||||
var outputFile *string = flag.StringP("output", "o", "", "Output Parquet file")
|
||||
var compression *string = flag.StringP("compression", "c", "ZSTD", "Compression codec (UNCOMPRESSED, SNAPPY, GZIP, BROTLI, LZ4, ZSTD)")
|
||||
var noColors *bool = flag.Bool("no-colors", false, "Remove all colors")
|
||||
var debug *bool = flag.Bool("debug", false, "Debug mode")
|
||||
flag.Parse()
|
||||
if *inputFile == "" || *outputFile == "" {
|
||||
log.Fatal("Input and output files are required")
|
||||
}
|
||||
if *noColors {
|
||||
settings.DisableColors()
|
||||
}
|
||||
lu.Compression = *compression
|
||||
lu.Debug = *debug
|
||||
err := misc.JsonToParquet(lu, *inputFile, *outputFile)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to transform JSON file", "error", err)
|
||||
}
|
||||
return
|
||||
case "mergeFiles":
|
||||
var inputFiles *[]string = flag.StringArrayP("inputs", "i", []string{}, "Input Parquet files")
|
||||
var outputFile *string = flag.StringP("output", "o", "", "Output Parquet file")
|
||||
|
||||
107
leak-utils/misc/json.go
Normal file
107
leak-utils/misc/json.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package misc
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/anotherhadi/eleakxir/leak-utils/settings"
|
||||
)
|
||||
|
||||
func flattenJSON(prefix string, in map[string]any, out map[string]any) {
|
||||
for k, v := range in {
|
||||
key := k
|
||||
if prefix != "" {
|
||||
key = prefix + "." + k
|
||||
}
|
||||
switch child := v.(type) {
|
||||
case map[string]any:
|
||||
flattenJSON(key, child, out)
|
||||
case []any:
|
||||
for i, item := range child {
|
||||
tempMap := make(map[string]any)
|
||||
tempMap[fmt.Sprintf("%d", i)] = item
|
||||
flattenJSON(key, tempMap, out)
|
||||
}
|
||||
default:
|
||||
out[key] = fmt.Sprintf("%v", child)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func flattenJSONFile(inputFile string, outputFile string) error {
|
||||
in, err := os.Open(inputFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer in.Close()
|
||||
|
||||
out, err := os.Create(outputFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
scanner := bufio.NewScanner(in)
|
||||
writer := bufio.NewWriter(out)
|
||||
defer writer.Flush()
|
||||
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.HasPrefix(line, "[") {
|
||||
var arr []map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &arr); err != nil {
|
||||
return fmt.Errorf("invalid JSON array: %w", err)
|
||||
}
|
||||
for _, obj := range arr {
|
||||
flat := make(map[string]any)
|
||||
flattenJSON("", obj, flat)
|
||||
b, err := json.Marshal(flat)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal flattened JSON: %w", err)
|
||||
}
|
||||
writer.Write(b)
|
||||
writer.WriteString("\n")
|
||||
}
|
||||
} else {
|
||||
var obj map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &obj); err != nil {
|
||||
return fmt.Errorf("invalid JSON object: %w", err)
|
||||
}
|
||||
flat := make(map[string]any)
|
||||
flattenJSON("", obj, flat)
|
||||
b, err := json.Marshal(flat)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal flattened JSON: %w", err)
|
||||
}
|
||||
writer.Write(b)
|
||||
writer.WriteString("\n")
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func JsonToParquet(lu settings.LeakUtils, inputFile string, outputFile string) error {
|
||||
tmpFile := filepath.Join("/tmp", "leak-utils.flat.json")
|
||||
err := flattenJSONFile(inputFile, tmpFile)
|
||||
defer os.Remove(tmpFile)
|
||||
|
||||
query := fmt.Sprintf(`COPY (FROM read_json('%s', union_by_name=true)) TO '%s' (FORMAT 'parquet', COMPRESSION '%s', ROW_GROUP_SIZE 200000);`, tmpFile, outputFile, lu.Compression)
|
||||
|
||||
_, err = lu.Db.Exec(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("duckdb copy error: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user