Quick use case : Denormalizing data with logstash

Quick use case : Denormalizing data with logstash

If you want to load the following data into elasticsearch, you will need to make some changes so you can create interesting tranforms and visualizations in kibana

image.png

The better would be to ingest data into elasticsearch with the following denormalized format

image.png

The following logstash pipeline is the solution, it rely on a :

  • ruby filter to build metrics array
  • split filter to generate mulitple events
input {
  file {
            path => ["/opt/denormalize/dataset.txt"]
            start_position => "beginning"
            sincedb_path => "/opt/denormalize/dataset.sincedb"
    }
}

filter {

    # Parse the message with csv filter
    csv {
            columns => [ "country", "company", "2018", "2019", "2020" ]
            separator => ";"

        }
    # Remove the first event if it start with the header
    if [country] == "country" {
                  drop{}
                }
    # Build the array of metrics        
    ruby {
        code => '
            metrics = []
            [ "2018", "2019", "2020" ].each { |v|
                metrics << { "year" => v, "metric" => event.get(v) }
            }
            event.set("metrics", metrics)
        '
    }

    # Split the array of metrics to denormalize our data
    split { field => "metrics" }

    # Apply some renaming & remove unecessary fields
    mutate { 
            rename => { 
                    "[metrics][metric]" => "metric" 
                    "[metrics][year]" => "year"
                    } 
            remove_field => [ "metrics", "2018", "2019", "2020", "@version", "message", "path", "host" ] 
            }
    }

output {
    stdout {}
}

The ouput will be like this

{
          "year" => "2018",
    "@timestamp" => 2020-12-30T18:20:45.574Z,
        "metric" => "10",
       "company" => "elastic",
       "country" => "morocco"
}
{
          "year" => "2019",
    "@timestamp" => 2020-12-30T18:20:45.574Z,
        "metric" => "15",
       "company" => "elastic",
       "country" => "morocco"
}
{
          "year" => "2020",
    "@timestamp" => 2020-12-30T18:20:45.574Z,
        "metric" => "20",
       "company" => "elastic",
       "country" => "morocco"
}
{
          "year" => "2018",
    "@timestamp" => 2020-12-30T18:20:45.575Z,
        "metric" => "5",
       "company" => "splunk",
       "country" => "morocco"
}
{
          "year" => "2019",
    "@timestamp" => 2020-12-30T18:20:45.575Z,
        "metric" => "10",
       "company" => "splunk",
       "country" => "morocco"
}
{
          "year" => "2020",
    "@timestamp" => 2020-12-30T18:20:45.575Z,
        "metric" => "15",
       "company" => "splunk",
       "country" => "morocco"
}
{
          "year" => "2018",
    "@timestamp" => 2020-12-30T18:20:45.575Z,
        "metric" => "15",
       "company" => "cisco",
       "country" => "morocco"
}
{
          "year" => "2019",
    "@timestamp" => 2020-12-30T18:20:45.575Z,
        "metric" => "20",
       "company" => "cisco",
       "country" => "morocco"
}
{
          "year" => "2020",
    "@timestamp" => 2020-12-30T18:20:45.575Z,
        "metric" => "25",
       "company" => "cisco",
       "country" => "morocco"
}