Skip to main content

Command Palette

Search for a command to run...

Elasticsearch Top hits aggregation with transform

Updated
10 min read
Elasticsearch Top hits aggregation with transform
Y

Product Manager at Synapticiel, IT & Telecoms Professional, Interested in ML, AI, IoT, BigData, Cyber Security, RA, FMS, Next Generation BSS/OSS, Mobile Money

Suppose you build an application to manage an entity (like Customer, Tickets, ... etc) and you are logging any change on entity into elasticsearch index.

Example of Case Management application that log any change on Ticket into elasticsearch monthly indexes, the date inside elasticsearch looks like this table :

image.png

Now the idea is to build a report based on the latest version of each ticket in a specific time, the bellow table show the expected result thant can be used for many analytics :

image.png

Let's how we can achieve this with elasticsearch using differents ways :

  • Following is the sample index mapping that will be used for our use case
PUT tickets-logs
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "event": {
        "properties": {
          "category": {
            "type": "keyword"
          }
        }
      },
      "createdAt": {
        "type": "date",
        "format": [
          "yyyy-MM-dd HH:mm:ss"
        ]
      },
      "updatedAt": {
        "type": "date",
        "format": [
          "yyyy-MM-dd HH:mm:ss"
        ]
      },
      "ticketNumber": {
        "type": "keyword"
      },
      "ticketStatus": {
        "type": "keyword"
      },
      "createdBy": {
        "type": "keyword"
      },
      "updatedBy": {
        "type": "keyword"
      }
    }
  }
}
  • Load some data into our index
POST tickets-logs/_doc
{
  "@timestamp": "2020-12-15T14:10:10.000Z",
  "event": {
    "category": "ticket_update"
  },
  "ticketNumber": "TT-00001",
  "ticketStatus": "OPEN",
  "createAt": "2020-12-15 10:10:10",
  "createdBy": "admin",
  "updatedBy": "admin",
  "updatedAt": "2020-12-15 10:10:10"
}
  • The data will looks like this in Kibana (Discovery app)

image.png

Elasticsearcg offer a nice option in search queries to collapse events based on attributes and order the collapsed events per some field (sorting)

GET tickets-logs/_search
{
  "from": 0, 
  "query": {
    "match_all": {}
  },
  "collapse": {
    "field": "ticketNumber"
  },
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}

This sample collapse query will return only 1 event per ticketNumber based on the latest timestamp field updatedBy. The result is as follow :

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [
      {
        "_index" : "tickets-logs",
        "_type" : "_doc",
        "_id" : "oFSfZnYBgg6oWjFlpMJK",
        "_score" : null,
        "_source" : {
          "ticketNumber" : "TT-00001",
          "createdAt" : "2020-12-15 10:10:10",
          "updatedBy" : "manager",
          "@timestamp" : "2020-12-15T14:10:10.000+01:00",
          "createdBy" : "admin",
          "ticketStatus" : "CLOSED",
          "event" : {
            "category" : "ticket_updated"
          },
          "updatedAt" : "2020-12-15 14:10:10"
        },
        "fields" : {
          "ticketNumber" : [
            "TT-00001"
          ]
        },
        "sort" : [
          1608041410000
        ]
      },
      {
        "_index" : "tickets-logs",
        "_type" : "_doc",
        "_id" : "olSfZnYBgg6oWjFlpMJK",
        "_score" : null,
        "_source" : {
          "ticketNumber" : "TT-00002",
          "createdAt" : "2020-12-15 12:10:10",
          "updatedBy" : "manager",
          "@timestamp" : "2020-12-15T13:10:10.000+01:00",
          "createdBy" : "admin",
          "ticketStatus" : "OPEN",
          "event" : {
            "category" : "ticket_updated"
          },
          "updatedAt" : "2020-12-15 13:10:10"
        },
        "fields" : {
          "ticketNumber" : [
            "TT-00002"
          ]
        },
        "sort" : [
          1608037810000
        ]
      }
    ]
  }
}

This query can be used by the application to scroll all tickets and provide a nice mecanisme to collapse/uncollapse details about ticket changes. Unfortunalety Kibana 'At the moment of writing this post - Kibana v7.10.1' does not provide the collapse option to show results in dicovery app. So what is the solution (or solutions) then using only Kibana ....

Top Hits Aggregation

Top hits is one of the solution to quickly build a table with the latest attribute of the tickets.

GET tickets-logs/_search
{
  "size": 0, 
  "aggs": {
    "tiketId": {
      "terms": {
        "field": "ticketNumber",
        "size": 10
      },
      "aggs": {
        "ticket": {
          "top_hits": {
           "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ],
            "size": 1
          }
        }
      }
    }
  }
}

The result will quickly show the latest document per `ticketNumber based on the lastest updatedAt

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "tiketId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "TT-00001",
          "doc_count" : 6,
          "ticket" : {
            "hits" : {
              "total" : {
                "value" : 6,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "tickets-logs",
                  "_type" : "_doc",
                  "_id" : "oFSfZnYBgg6oWjFlpMJK",
                  "_score" : null,
                  "_source" : {
                    "ticketNumber" : "TT-00001",
                    "createdAt" : "2020-12-15 10:10:10",
                    "updatedBy" : "manager",
                    "@timestamp" : "2020-12-15T14:10:10.000+01:00",
                    "createdBy" : "admin",
                    "ticketStatus" : "CLOSED",
                    "event" : {
                      "category" : "ticket_updated"
                    },
                    "updatedAt" : "2020-12-15 14:10:10"
                  },
                  "sort" : [
                    1608037810000
                  ]
                }
              ]
            }
          }
        },
        {
          "key" : "TT-00002",
          "doc_count" : 2,
          "ticket" : {
            "hits" : {
              "total" : {
                "value" : 2,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "tickets-logs",
                  "_type" : "_doc",
                  "_id" : "olSfZnYBgg6oWjFlpMJK",
                  "_score" : null,
                  "_source" : {
                    "ticketNumber" : "TT-00002",
                    "createdAt" : "2020-12-15 12:10:10",
                    "updatedBy" : "manager",
                    "@timestamp" : "2020-12-15T13:10:10.000+01:00",
                    "createdBy" : "admin",
                    "ticketStatus" : "OPEN",
                    "event" : {
                      "category" : "ticket_updated"
                    },
                    "updatedAt" : "2020-12-15 13:10:10"
                  },
                  "sort" : [
                    1608034210000
                  ]
                }
              ]
            }
          }
        }
      ]
    }
  }
}

With Data table visualisation, we can quickly get the latest attribute of each tickets as follow

image.png

Transform

But this will not help us to build differents analytics from this view like : Pie to show % of tickets per status ... The solution will be to build a new entity centric indexe for our entity ticketNumber using Tranform, But .... tranform (At the time of writing this post - v7.10.1 does not yet support top hits aggregtation).

The solution will be to use a painless script inside scripted bucket to get the disered result, so following this Example we will build a tranform to populate our new entity centric indexe

The following script help us to create the tranform, that can be runned on a continuous mode or batch mode

PUT _transform/tickets_tranform
{
  "source": {
    "index": [
      "tickets-logs"
    ]
  },
  "pivot": {
    "group_by": {
      "ticketNumber": {
        "terms": {
          "field": "ticketNumber"
        }
      }
    },
    "aggregations": {
      "latest_ticket": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
          "map_script": """ 
        def current_date = doc['updatedAt'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_doc = new HashMap(params['_source']);}
      """,
          "combine_script": "return state",
          "reduce_script": """ 
        def last_doc = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
        return last_doc
      """
        }
      }
    }
  },
  "description": "Get the latest update per ticketNumber",
  "dest": {
    "index": "tickets-transform"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "updatedAt",
      "delay": "60s"
    }
  }
}

The final result will be saved into a new index : tickets-transform

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "tickets-transform",
        "_type" : "_doc",
        "_id" : "VK56D8atWjOwvkNHLedwMcAAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "ticketNumber" : "TT-00001",
          "latest_ticket" : {
            "ticketNumber" : "TT-00001",
            "createdAt" : "2020-12-15 10:10:10",
            "updatedBy" : "manager",
            "@timestamp" : "2020-12-15T14:10:10.000+01:00",
            "createdBy" : "admin",
            "ticketStatus" : "CLOSED",
            "event" : {
              "category" : "ticket_updated"
            },
            "updatedAt" : "2020-12-15 14:10:10"
          }
        }
      },
      {
        "_index" : "tickets-transform",
        "_type" : "_doc",
        "_id" : "VA4blk9me07KdqMYrb-KZOIAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "ticketNumber" : "TT-00002",
          "latest_ticket" : {
            "ticketNumber" : "TT-00002",
            "createdAt" : "2020-12-15 12:10:10",
            "updatedBy" : "manager",
            "@timestamp" : "2020-12-15T13:10:10.000+01:00",
            "createdBy" : "admin",
            "ticketStatus" : "OPEN",
            "event" : {
              "category" : "ticket_updated"
            },
            "updatedAt" : "2020-12-15 13:10:10"
          }
        }
      }
    ]
  }
}

And the job is done, we have a new index where we can get all our entity analytics.

image.png

P

Hi, Recently we've implemented latest function for transforms which should allow you to replace pivot with latest and get rid of the scripts in your transform.

Check out the overview here and the API example here

Please note that this functionality is currently beta but it will change soon.

1
Y

Nice to see Elastic team is moving on and enriching supported aggregations in the tranform, Thanks for the comment, i will have a look to this and give it a try

1

More from this blog

My Tech Memories

10 posts

Welcome to my Hashnode blog where i post some of my finding in tech world, mostly interested by topics related to data analytics, observability and cyber security. I like share my experience on ELK