Elasticsearch Top hits aggregation with transform

Elasticsearch Top hits aggregation with transform

Suppose you build an application to manage an entity (like Customer, Tickets, ... etc) and you are logging any change on entity into elasticsearch index.

Example of Case Management application that log any change on Ticket into elasticsearch monthly indexes, the date inside elasticsearch looks like this table :

image.png

Now the idea is to build a report based on the latest version of each ticket in a specific time, the bellow table show the expected result thant can be used for many analytics :

image.png

Let's how we can achieve this with elasticsearch using differents ways :

  • Following is the sample index mapping that will be used for our use case
PUT tickets-logs
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "event": {
        "properties": {
          "category": {
            "type": "keyword"
          }
        }
      },
      "createdAt": {
        "type": "date",
        "format": [
          "yyyy-MM-dd HH:mm:ss"
        ]
      },
      "updatedAt": {
        "type": "date",
        "format": [
          "yyyy-MM-dd HH:mm:ss"
        ]
      },
      "ticketNumber": {
        "type": "keyword"
      },
      "ticketStatus": {
        "type": "keyword"
      },
      "createdBy": {
        "type": "keyword"
      },
      "updatedBy": {
        "type": "keyword"
      }
    }
  }
}
  • Load some data into our index
POST tickets-logs/_doc
{
  "@timestamp": "2020-12-15T14:10:10.000Z",
  "event": {
    "category": "ticket_update"
  },
  "ticketNumber": "TT-00001",
  "ticketStatus": "OPEN",
  "createAt": "2020-12-15 10:10:10",
  "createdBy": "admin",
  "updatedBy": "admin",
  "updatedAt": "2020-12-15 10:10:10"
}
  • The data will looks like this in Kibana (Discovery app)

image.png

Elasticsearcg offer a nice option in search queries to collapse events based on attributes and order the collapsed events per some field (sorting)

GET tickets-logs/_search
{
  "from": 0, 
  "query": {
    "match_all": {}
  },
  "collapse": {
    "field": "ticketNumber"
  },
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}

This sample collapse query will return only 1 event per ticketNumber based on the latest timestamp field updatedBy. The result is as follow :

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [
      {
        "_index" : "tickets-logs",
        "_type" : "_doc",
        "_id" : "oFSfZnYBgg6oWjFlpMJK",
        "_score" : null,
        "_source" : {
          "ticketNumber" : "TT-00001",
          "createdAt" : "2020-12-15 10:10:10",
          "updatedBy" : "manager",
          "@timestamp" : "2020-12-15T14:10:10.000+01:00",
          "createdBy" : "admin",
          "ticketStatus" : "CLOSED",
          "event" : {
            "category" : "ticket_updated"
          },
          "updatedAt" : "2020-12-15 14:10:10"
        },
        "fields" : {
          "ticketNumber" : [
            "TT-00001"
          ]
        },
        "sort" : [
          1608041410000
        ]
      },
      {
        "_index" : "tickets-logs",
        "_type" : "_doc",
        "_id" : "olSfZnYBgg6oWjFlpMJK",
        "_score" : null,
        "_source" : {
          "ticketNumber" : "TT-00002",
          "createdAt" : "2020-12-15 12:10:10",
          "updatedBy" : "manager",
          "@timestamp" : "2020-12-15T13:10:10.000+01:00",
          "createdBy" : "admin",
          "ticketStatus" : "OPEN",
          "event" : {
            "category" : "ticket_updated"
          },
          "updatedAt" : "2020-12-15 13:10:10"
        },
        "fields" : {
          "ticketNumber" : [
            "TT-00002"
          ]
        },
        "sort" : [
          1608037810000
        ]
      }
    ]
  }
}

This query can be used by the application to scroll all tickets and provide a nice mecanisme to collapse/uncollapse details about ticket changes. Unfortunalety Kibana 'At the moment of writing this post - Kibana v7.10.1' does not provide the collapse option to show results in dicovery app. So what is the solution (or solutions) then using only Kibana ....

Top Hits Aggregation

Top hits is one of the solution to quickly build a table with the latest attribute of the tickets.

GET tickets-logs/_search
{
  "size": 0, 
  "aggs": {
    "tiketId": {
      "terms": {
        "field": "ticketNumber",
        "size": 10
      },
      "aggs": {
        "ticket": {
          "top_hits": {
           "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ],
            "size": 1
          }
        }
      }
    }
  }
}

The result will quickly show the latest document per `ticketNumber based on the lastest updatedAt

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 8,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "tiketId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "TT-00001",
          "doc_count" : 6,
          "ticket" : {
            "hits" : {
              "total" : {
                "value" : 6,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "tickets-logs",
                  "_type" : "_doc",
                  "_id" : "oFSfZnYBgg6oWjFlpMJK",
                  "_score" : null,
                  "_source" : {
                    "ticketNumber" : "TT-00001",
                    "createdAt" : "2020-12-15 10:10:10",
                    "updatedBy" : "manager",
                    "@timestamp" : "2020-12-15T14:10:10.000+01:00",
                    "createdBy" : "admin",
                    "ticketStatus" : "CLOSED",
                    "event" : {
                      "category" : "ticket_updated"
                    },
                    "updatedAt" : "2020-12-15 14:10:10"
                  },
                  "sort" : [
                    1608037810000
                  ]
                }
              ]
            }
          }
        },
        {
          "key" : "TT-00002",
          "doc_count" : 2,
          "ticket" : {
            "hits" : {
              "total" : {
                "value" : 2,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "tickets-logs",
                  "_type" : "_doc",
                  "_id" : "olSfZnYBgg6oWjFlpMJK",
                  "_score" : null,
                  "_source" : {
                    "ticketNumber" : "TT-00002",
                    "createdAt" : "2020-12-15 12:10:10",
                    "updatedBy" : "manager",
                    "@timestamp" : "2020-12-15T13:10:10.000+01:00",
                    "createdBy" : "admin",
                    "ticketStatus" : "OPEN",
                    "event" : {
                      "category" : "ticket_updated"
                    },
                    "updatedAt" : "2020-12-15 13:10:10"
                  },
                  "sort" : [
                    1608034210000
                  ]
                }
              ]
            }
          }
        }
      ]
    }
  }
}

With Data table visualisation, we can quickly get the latest attribute of each tickets as follow

image.png

Transform

But this will not help us to build differents analytics from this view like : Pie to show % of tickets per status ... The solution will be to build a new entity centric indexe for our entity ticketNumber using Tranform, But .... tranform (At the time of writing this post - v7.10.1 does not yet support top hits aggregtation).

The solution will be to use a painless script inside scripted bucket to get the disered result, so following this Example we will build a tranform to populate our new entity centric indexe

The following script help us to create the tranform, that can be runned on a continuous mode or batch mode

PUT _transform/tickets_tranform
{
  "source": {
    "index": [
      "tickets-logs"
    ]
  },
  "pivot": {
    "group_by": {
      "ticketNumber": {
        "terms": {
          "field": "ticketNumber"
        }
      }
    },
    "aggregations": {
      "latest_ticket": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L; state.last_doc = ''",
          "map_script": """ 
        def current_date = doc['updatedAt'].getValue().toInstant().toEpochMilli();
        if (current_date > state.timestamp_latest)
        {state.timestamp_latest = current_date;
        state.last_doc = new HashMap(params['_source']);}
      """,
          "combine_script": "return state",
          "reduce_script": """ 
        def last_doc = '';
        def timestamp_latest = 0L;
        for (s in states) {if (s.timestamp_latest > (timestamp_latest))
        {timestamp_latest = s.timestamp_latest; last_doc = s.last_doc;}}
        return last_doc
      """
        }
      }
    }
  },
  "description": "Get the latest update per ticketNumber",
  "dest": {
    "index": "tickets-transform"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "updatedAt",
      "delay": "60s"
    }
  }
}

The final result will be saved into a new index : tickets-transform

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "tickets-transform",
        "_type" : "_doc",
        "_id" : "VK56D8atWjOwvkNHLedwMcAAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "ticketNumber" : "TT-00001",
          "latest_ticket" : {
            "ticketNumber" : "TT-00001",
            "createdAt" : "2020-12-15 10:10:10",
            "updatedBy" : "manager",
            "@timestamp" : "2020-12-15T14:10:10.000+01:00",
            "createdBy" : "admin",
            "ticketStatus" : "CLOSED",
            "event" : {
              "category" : "ticket_updated"
            },
            "updatedAt" : "2020-12-15 14:10:10"
          }
        }
      },
      {
        "_index" : "tickets-transform",
        "_type" : "_doc",
        "_id" : "VA4blk9me07KdqMYrb-KZOIAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "ticketNumber" : "TT-00002",
          "latest_ticket" : {
            "ticketNumber" : "TT-00002",
            "createdAt" : "2020-12-15 12:10:10",
            "updatedBy" : "manager",
            "@timestamp" : "2020-12-15T13:10:10.000+01:00",
            "createdBy" : "admin",
            "ticketStatus" : "OPEN",
            "event" : {
              "category" : "ticket_updated"
            },
            "updatedAt" : "2020-12-15 13:10:10"
          }
        }
      }
    ]
  }
}

And the job is done, we have a new index where we can get all our entity analytics.

image.png