By using this site, you agree to the Privacy Policy and Terms of Use.
Accept
World of SoftwareWorld of SoftwareWorld of Software
  • News
  • Software
  • Mobile
  • Computing
  • Gaming
  • Videos
  • More
    • Gadget
    • Web Stories
    • Trending
    • Press Release
Search
  • Privacy
  • Terms
  • Advertise
  • Contact
Copyright © All Rights Reserved. World of Software.
Reading: How to Fetch Large Logs from Elastic API Using the search_after Method | HackerNoon
Share
Sign In
Notification Show More
Font ResizerAa
World of SoftwareWorld of Software
Font ResizerAa
  • Software
  • Mobile
  • Computing
  • Gadget
  • Gaming
  • Videos
Search
  • News
  • Software
  • Mobile
  • Computing
  • Gaming
  • Videos
  • More
    • Gadget
    • Web Stories
    • Trending
    • Press Release
Have an existing account? Sign In
Follow US
  • Privacy
  • Terms
  • Advertise
  • Contact
Copyright © All Rights Reserved. World of Software.
World of Software > Computing > How to Fetch Large Logs from Elastic API Using the search_after Method | HackerNoon
Computing

How to Fetch Large Logs from Elastic API Using the search_after Method | HackerNoon

News Room
Last updated: 2025/07/10 at 3:49 PM
News Room Published 10 July 2025
Share
SHARE

Background

I needed to extract a significant number of access logs from Elastic Cloud for analytical purposes over several months. During our research, I discovered that the number of logs generated per month was in the hundreds of millions, which far exceeded the limit of Kibana’s built-in tools. To overcome this issue, we implemented a Python script that leverages the Elasticsearch API and the search_after method to iteratively retrieve logs.

Challenges

  • High volume of logs: Querying logs for multiple months meant handling hundreds of millions of records.
  • Kibana limitations: The default Kibana UI and API have limitations that prevent fetching such large datasets in a single query.
  • Elasticsearch query limits: A single query can return a maximum of 10,000 records at a time.
  • Performance considerations: Avoiding high load on the Elasticsearch cluster while ensuring smooth data retrieval.

API token creation

To authenticate API requests, I created an API key with the following permissions:

{
  "superuser": {
    "cluster": ["all"],
    "indices": [
      {
        "names": ["*"],
        "privileges": ["all"],
        "allow_restricted_indices": false
      },
      {
        "names": ["*"],
        "privileges": ["monitor", "read", "view_index_metadata", "read_cross_cluster", "manage"],
        "allow_restricted_indices": true
      }
    ]
  }
}

Notes:

  • The API key must be in Base64 format before using it in requests.
  • While not the safest approach, in this example, I give access to all indexes, all privileges, and all clusters. This is the easiest way to do it, and users can limit the indices and privileges necessary for the task.

Elasticsearch query setup

I used Kibana’s Dev Tools to construct a query before implementing it in Python. The query included:

  • A wildcard filter to fetch logs matching specific user agents.
  • A time range filter to limit logs within the required period.
  • Sorting by _doc to improve performance with search_after.

Example query:

{
  "query": {
    "bool": {
      "filter": [
        {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
        {"range": {"@timestamp": {"gte": "now-2h", "lte": "now"}}}
      ]
    }
  },
  "size": 10000,
  "sort": [{"_doc": "desc"}], 
  "pit": {
     "id": "", 
     "keep_alive": "60m"
  },
  "fields": [
    "json.EdgeRequestHost","json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
  ]
}

Implementation using Python

Before starting, install the elasticsearch package in any convenient way, e.g., pip install elasticsearch

I implemented a Python script using the elasticsearch library:

import json
import time
from datetime import datetime, timezone, timedelta
from elasticsearch import Elasticsearch

# Elasticsearch connection settings
ES_URL = ""
API_KEY = ""

# Query parameters
BATCH_SIZE = 10000 # Max 10000
OUTPUT_FILE = "logs.json"
INDEX = "EXAMPLE_INDEX"
KEEP_ALIVE = "60m"
TIME_WINDOW = 60  # Minutes

# Initialize Elasticsearch client
es = Elasticsearch(ES_URL, api_key=API_KEY, request_timeout=60, verify_certs=True)

def create_pit():
    """Create Point in Time"""
    return es.open_point_in_time(index=INDEX, keep_alive=KEEP_ALIVE)["id"]

def close_pit(pit_id):
    """Close Point in Time"""
    es.close_point_in_time(body={"id": pit_id})

def get_query(pit_id, search_after=None):
    """Generate search query for last TIME_WINDOW minutes"""
    now = datetime.now(timezone.utc)
    start_time = now - timedelta(minutes=TIME_WINDOW)
    query = {
        "pit": {"id": pit_id, "keep_alive": KEEP_ALIVE},
        "size": BATCH_SIZE,
        "sort": [{"_doc": "desc"}],
        "query": {
            "bool": {
                "filter": [
                    {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
                    {"range": {"@timestamp": {"gte": start_time.isoformat(), "lte": now.isoformat(),
                                              "format": "strict_date_optional_time"}}}
                ]
            }
        },
        "fields": [
            "json.EdgeRequestHost", "json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
        ]
    }
    if search_after:
        query["search_after"] = search_after
    return query

def transform_hit(hit):
    """Transform record into required format"""
    fields = hit.get("fields", {})
    return {
        "remote_ip": fields.get("source.ip", ["-"])[0] if "source.ip" in fields else "-",
        "remote_log": "-",
        "user": "-",
        "timestamp": fields.get("json.EdgeStartTimestamp", ["-"])[0] if "json.EdgeStartTimestamp" in fields else "-",
        "request-path": fields.get('url.path', ['-'])[0] if "url.path" in fields else "-",
        "request-host": fields.get('json.EdgeRequestHost', ['-'])[0] if "json.EdgeRequestHost" in fields else "-",
        "status": "-",
        "response-bytes": "-",
        "time-take": "-",
        "referer": fields.get("json.ClientRequestReferer", ["-"])[0] if "json.ClientRequestReferer" in fields else "-",
        "ua": fields.get("json.ClientRequestUserAgent", ["-"])[0] if "json.ClientRequestUserAgent" in fields else "-"
    }

def fetch_logs():
    """Fetch logs from Elasticsearch"""
    pit_id = None
    start_time = time.time()
    try:
        pit_id = create_pit()
        print("PIT opened")
        total_records = 0
        search_after = None
        print("Starting logs extraction from Elasticsearch...")
        with open(OUTPUT_FILE, 'w', encoding='utf-8') as outfile:
            while True:
                response = es.search(body=get_query(pit_id, search_after))
                hits = response.get("hits", {}).get("hits", [])
                if not hits:
                    break
                for hit in hits:
                    outfile.write(json.dumps(transform_hit(hit)) + "n")
                    total_records += 1
                    if total_records % BATCH_SIZE == 0:
                        elapsed_time = time.time() - start_time
                        elapsed_str = str(timedelta(seconds=elapsed_time))
                        print(f"Processed {total_records} records. Time elapsed: {elapsed_str}...")
                search_after = hits[-1].get("sort")
                time.sleep(0.1)
        elapsed_time = time.time() - start_time
        elapsed_str = str(timedelta(seconds=elapsed_time))
        print(f"nTotal processed records: {total_records}. Saved to {OUTPUT_FILE}. Time taken: {elapsed_str}")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        if pit_id:
            try:
                close_pit(pit_id)
                print("PIT closed")
            except Exception as e:
                print(f"Error closing PIT: {e}")

if __name__ == "__main__":
    fetch_logs()

Details

Traffic filters in Elastic Cloud

To allow access from any machine, users may need to configure Traffic filters in Elastic Cloud.

  • Filters can be created here.
  • Official documentation: Elastic Cloud Traffic Filtering.
  • After creating a filter, don’t forget to apply it:
  • Go to cloud.elastic.co/deployments → select your deployment → Security → Traffic Filters → Apply Traffic Filter.

Query development in Kibana dev tools

Before implementing queries in the script, I first built and tested them in Kibana dev tools.

Documentation: Kibana Console.

Creating an API key in Elastic Cloud

To authenticate requests, create an API key in Elastic Cloud.

  • The API key must be Base64-encoded for use in the script.
  • Generate it in the Kibana Security section or in the Elastic Cloud console.
  • Documentation: Create API Key.

Conclusion

Using search_after PIT allowed us to efficiently fetch large log datasets from Elastic Cloud.

Sign Up For Daily Newsletter

Be keep up! Get the latest breaking news delivered straight to your inbox.
By signing up, you agree to our Terms of Use and acknowledge the data practices in our Privacy Policy. You may unsubscribe at any time.
Share This Article
Facebook Twitter Email Print
Share
What do you think?
Love0
Sad0
Happy0
Sleepy0
Angry0
Dead0
Wink0
Previous Article I’ve had my GHDs for 15 years – now you can get yours for just £88
Next Article ‘Signs of torture’ on body of fired Putin minister in suicide ‘cover-up’
Leave a comment

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Stay Connected

248.1k Like
69.1k Follow
134k Pin
54.3k Follow

Latest News

Xpeng talks about camera-based approach with new electric sedan · TechNode
Computing
Prime Day Kindle Paperwhite vs. Signature Edition deals: Which has the better discount?
News
Explore the 4 Best Tokens to Buy in 2025 Before the Next Market Rally
Gadget
Windows Server Update Services is broken
Mobile

You Might also Like

Computing

Xpeng talks about camera-based approach with new electric sedan · TechNode

5 Min Read
Computing

Ready to Expand in Asia? BEYOND Expo’s Regional Cooperation Forums Are Where Global Ambitions Take Off · TechNode

6 Min Read
Computing

Starbucks China stake sale draws bids valuing business up to $10 billion · TechNode

4 Min Read
Computing

Solar Panel Guide for Nigeria: Prices, Installation & Best Options

18 Min Read
//

World of Software is your one-stop website for the latest tech news and updates, follow us now to get the news that matters to you.

Quick Link

  • Privacy Policy
  • Terms of use
  • Advertise
  • Contact

Topics

  • Computing
  • Software
  • Press Release
  • Trending

Sign Up for Our Newsletter

Subscribe to our newsletter to get our newest articles instantly!

World of SoftwareWorld of Software
Follow US
Copyright © All Rights Reserved. World of Software.
Welcome Back!

Sign in to your account

Lost your password?