Often a quick data import is required. The systems are getting more complicated and what once was a simple import now has to go through many layers.
Kubernetes is the standard; it is the best clay for sculpting out a system. At Translucent we build all of our systems on top of Kubernetes. There are many different ways of interacting with Kubernetes to import data into a database running within the cluster. Here, I’m going to describe a quick importer.
The Elasticsearch database is running within the Kubernetes cluster. The purpose of the database is to store logs from the cluster and every docker container running within the cluster. Email activity was extracted from SendGrid into a CSV file and our goal is to import that CSV file into an Elasticsearch index.
We are cloud agnostic. In this case, the cluster is running within Google Cloud. Helm was used to install an Elasticsearch cluster into Kubernetes. Elasticsearch cluster is made up of six (6) data nodes, three (3) masters nodes, and one (1) client node. The data is collected and pushed to Elasticsearch from the Kubernetes cluster with Fluentd log pipeline, which was also installed with Helm.
To import the data, we are going to open a connection to the Elasticsearch client node, execute python code locally from my computer and push the data to an Elasticsearch index.
There are a few different Elasticsearch clients. We use Java client and NodeJS client in our Microservices. Here I chose to use Python client.
'''
Quick Importer - save CSV data into ES index
'''
from elasticsearch import Elasticsearch
import csv
import os
import datetime
def importData(fileName, indexName):
es = Elasticsearch()
# get the file path, file in data dir
filePath = os.path.join(os.path.dirname(os.path.realpath(__file__)),os.path.join('data',fileName))
with open(filePath, 'rt') as f:
reader = csv.reader(f)
headers = next(reader) # first line in the file is the header
for row in reader:
try:
obj = {}
for i, val in enumerate(row):
if i == 0:
# convert string to timestamp
obj[headers[i]] = datetime.datetime.strptime(val, '%m/%d/%Y %H:%M:%S')
else:
obj[headers[i]] = val
#doc_type – Document type, defaults to _doc. Not used on ES 7 clusters.
es.index(index=indexName, body=obj)
except Exception as e:
print("Error: {}:".format(e))
# run the import
importData("emails.csv", "emails")
We find the CSV file, read the file, iterate through it, create an object and save the object into Elasticsearch index.
For security reasons, all the clusters have the master node protected. In GKE (Google Kubernetes) you have to add your IP to the authorized network before you can access the cluster locally.
To expose the Elasticsearch client node locally you run the port forward command.
| => kubectl port-forward elasticsearch-monitoring-client-5f7fbd5865-mvtwd -n elasticsearch-monitoring 9200:9200
Forwarding from 127.0.0.1:9200 -> 9200
Forwarding from [::1]:9200 -> 9200
Handling connection for 9200
You give it the pod name of the Elasticsearch client node with the namespace and the port number. In 9200:9200, the first number is your local port. Here I chose to use 9200 for my local port since the default port in the python Elasticsearch client is 9200. Once you see “Handling connection for 9200” you are ready to run your python code.
| => python3 csv_importer.py
Kibana is a dashboard for Elasticsearch and can be installed with Helm as well. Make sure you match Elasticsearch and Kibana versions. We can use Kibana to inspect the index and create visualizations from the imported data.
Using Kibana developer tools we can extract the index mapping that was created by the importer.
GET emails\_mapping
{
"emails" : {
"mappings" : {
"_doc" : {
"properties" : {
"message_id" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"timestamp" : {
"type" : "date"
}
}
}
}
}
}
A quick check shows that these are the 2 columns of data that we had in the CSV and the timestamp is the correct data type. We are done with the importer!
Elasticsearch Helm Chart
https://github.com/helm/charts/tree/master/stable/elasticsearch
Kibana Helm Chart
https://github.com/helm/charts/tree/master/stable/kibana
Fluentd Helm Chart
https://github.com/helm/charts/tree/master/stable/fluentd
Kubectl
https://kubernetes.io/docs/reference/kubectl/cheatsheet/
Python Elasticsearch Client
https://elasticsearch-py.readthedocs.io/en/master/
May 11th, 2019
by Patryk Golabek in Kubernetes In Action
⟵ Back
See more:
December 10th, 2021
Cloud Composer – Terraform Deploymentby Patryk Golabek in Data-Driven, Technology
December 2nd, 2021
Provision Kubernetes: Securing Virtual MachinesAugust 6th, 2023
The Critical Need for Application Modernization in SMEs