Kubernetes, Elasticsearch, Python Importer

Kubernetes, Elasticsearch, Python Importer

Often a quick data import is required. The systems are getting more complicated and what once was a simple import now has to go through many layers.

Kubernetes is the standard; it is the best clay for sculpting out a system. At Translucent we build all of our systems on top of Kubernetes. There are many different ways of interacting with Kubernetes to import data into a database running within the cluster. Here, I’m going to describe a quick importer.

Use case

The Elasticsearch database is running within the Kubernetes cluster. The purpose of the database is to store logs from the cluster and every docker container running within the cluster. Email activity was extracted from SendGrid into a CSV file and our goal is to import that CSV file into an Elasticsearch index.

Background

We are cloud agnostic. In this case, the cluster is running within Google Cloud. Helm was used to install an Elasticsearch cluster into Kubernetes. Elasticsearch cluster is made up of six (6) data nodes, three (3) masters nodes, and one (1) client node. The data is collected and pushed to Elasticsearch from the Kubernetes cluster with Fluentd log pipeline, which was also installed with Helm.

Process

To import the data, we are going to open a connection to the Elasticsearch client node, execute python code locally from my computer and push the data to an Elasticsearch index.

Code

There are a few different Elasticsearch clients. We use Java client and NodeJS client in our Microservices. Here I chose to use Python client.

'''
Quick Importer - save CSV data into ES index
'''
from elasticsearch import Elasticsearch
import csv
import os
import datetime

def importData(fileName, indexName):		
	es = Elasticsearch()

	# get the file path, file in data dir
	filePath = os.path.join(os.path.dirname(os.path.realpath(__file__)),os.path.join('data',fileName))
	
	with open(filePath, 'rt') as f:
		reader = csv.reader(f)
		headers = next(reader) # first line in the file is the header
		for row in reader:
			try:
				obj = {}
				for i, val in enumerate(row):
					if i == 0:
						# convert string to timestamp	
						obj[headers[i]] = datetime.datetime.strptime(val, '%m/%d/%Y %H:%M:%S')
					else:
						obj[headers[i]] = val

				#doc_type – Document type, defaults to _doc. Not used on ES 7 clusters.
				es.index(index=indexName, body=obj)
			except Exception as e:
				print("Error: {}:".format(e))						
		
# run the import			
importData("emails.csv", "emails")

We find the CSV file, read the file, iterate through it, create an object and save the object into Elasticsearch index.

Importing

For security reasons, all the clusters have the master node protected. In GKE (Google Kubernetes) you have to add your IP to the authorized network before you can access the cluster locally.

To expose the Elasticsearch client node locally you run the port forward command.

| => kubectl port-forward elasticsearch-monitoring-client-5f7fbd5865-mvtwd -n elasticsearch-monitoring 9200:9200
Forwarding from 127.0.0.1:9200 -> 9200
Forwarding from [::1]:9200 -> 9200
Handling connection for 9200

You give it the pod name of the Elasticsearch client node with the namespace and the port number. In 9200:9200, the first number is your local port. Here I chose to use 9200 for my local port since the default port in the python Elasticsearch client is 9200. Once you see “Handling connection for 9200” you are ready to run your python code.

| => python3 csv_importer.py

Kibana

Kibana is a dashboard for Elasticsearch and can be installed with Helm as well. Make sure you match Elasticsearch and Kibana versions. We can use Kibana to inspect the index and create visualizations from the imported data.

Using Kibana developer tools we can extract the index mapping that was created by the importer.

GET emails\_mapping
{
  "emails" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "message_id" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "timestamp" : {
            "type" : "date"
          }
        }
      }
    }
  }
}

A quick check shows that these are the 2 columns of data that we had in the CSV and the timestamp is the correct data type. We are done with the importer!

Links

Elasticsearch Helm Chart

https://github.com/helm/charts/tree/master/stable/elasticsearch

Kibana Helm Chart

https://github.com/helm/charts/tree/master/stable/kibana

Fluentd Helm Chart

https://github.com/helm/charts/tree/master/stable/fluentd

Kubectl

https://kubernetes.io/docs/reference/kubectl/cheatsheet/

Python Elasticsearch Client

https://elasticsearch-py.readthedocs.io/en/master/

By on May 11th, 2019 in Technology
Tags: , ,


Go back to the Blog