Migrate PostgreSQL Table to Elasticsearch

This article demonstrates how to migrate a PostgresSQL table to Elasticsearch using a compound index to ensure uniqueness. Examples are in Python, Golang, NodeJS, Java and C#.

Note that you will need to replace the PostgreSQL connection parameters and Elasticsearch parameters with your own. Also, make sure to install the necessary dependencies before running each script.

Python – Elasticsearch library

Python script that queries a PostgreSQL database called accounts and its table payments, retrieves the data and creates an Elasticsearch index with the data. The Elasticsearch index is created with the id and payment_name fields as a compound index and uses the elasticsearch library.

Note that you will need to replace the PostgreSQL connection parameters and Elasticsearch parameters with your own. Also, make sure to install the psycopg2 and elasticsearch Python packages before running this script.

from elasticsearch import Elasticsearch
import psycopg2

# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname="accounts",
    user="postgres",
    host="localhost",
    password="your_password"
)
cur = conn.cursor()

# Query the payments table
cur.execute("SELECT id, payment_name, payment_info FROM payments")
rows = cur.fetchall()

# Connect to Elasticsearch
es = Elasticsearch()

# Create Elasticsearch index
for row in rows:
    es.index(
        index="payments-index",
        id=row[0],
        body={
            "id": row[0],
            "payment_name": row[1],
            "payment_info": row[2]
        }
    )

# Create compound Elasticsearch index
es.indices.create(index='payments-compound-index', body={
    'mappings': {
        'properties': {
            'id': {'type': 'keyword'},
            'payment_name': {'type': 'text'}
        }
    }
})

# Re-index the data with the compound index
es.reindex(
    body={
        "source": {
            "index": "payments-index"
        },
        "dest": {
            "index": "payments-compound-index"
        },
        "script": {
            "source": "ctx._source.remove('payment_info')"
        }
    }
)

# Close PostgreSQL connection
cur.close()
conn.close()

Python – Built-in libraries

This is the same script using only Python built-in libraries. Note that this script assumes that the PostgreSQL server is running on the same machine as the script and listening on the default port, and that the Elasticsearch server is running on the same machine and listening on port 9200. You will need to replace the connection parameters with your own as necessary.

import json
import socket
import sys

import psycopg2


def create_payments_index(cur, es):
    cur.execute("SELECT id, payment_name, payment_info FROM payments")
    rows = cur.fetchall()
    for row in rows:
        payment = {"id": row[0], "payment_name": row[1], "payment_info": row[2]}
        es.index("payments-index", payment, id=row[0])


def create_compound_index(es):
    index_mapping = {
        "mappings": {
            "properties": {
                "id": {"type": "keyword"},
                "payment_name": {"type": "text"},
            }
        }
    }
    es.indices.create(index="payments-compound-index", body=index_mapping)


def reindex_data(es):
    reindexing_query = {
        "source": {"index": "payments-index"},
        "dest": {"index": "payments-compound-index"},
        "script": {"source": "ctx._source.remove('payment_info')"},
    }
    reindexing_response = es.reindex(reindexing_query)
    if reindexing_response.get("failures"):
        raise Exception("Error reindexing data")


def connect_to_postgresql():
    try:
        conn = psycopg2.connect(
            dbname="accounts",
            user="postgres",
            host="localhost",
            password="your_password",
        )
        return conn.cursor()
    except psycopg2.Error as e:
        print("Unable to connect to the database: ", e)
        sys.exit(1)


def connect_to_elasticsearch():
    es = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    es.connect(("localhost", 9200))
    return es


if __name__ == "__main__":
    cur = connect_to_postgresql()
    es = connect_to_elasticsearch()

    create_payments_index(cur, es)
    create_compound_index(es)
    reindex_data(es)

    cur.close()
    es.close()

Golang

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "strings"

    "github.com/elastic/go-elasticsearch/v8"
    _ "github.com/lib/pq"
)

type Payment struct {
    ID           int64  `json:"id"`
    PaymentName  string `json:"payment_name"`
    PaymentInfo  string `json:"payment_info"`
}

func main() {
    // Connect to PostgreSQL
    db, err := sql.Open("postgres", "user=postgres password=your_password dbname=accounts sslmode=disable")
    if err != nil {
        log.Fatalf("Unable to connect to the database: %s", err)
    }
    defer db.Close()

    // Query the payments table
    rows, err := db.Query("SELECT id, payment_name, payment_info FROM payments")
    if err != nil {
        log.Fatalf("Unable to query the database: %s", err)
    }
    defer rows.Close()

    // Connect to Elasticsearch
    es, err := elasticsearch.NewDefaultClient()
    if err != nil {
        log.Fatalf("Error creating the Elasticsearch client: %s", err)
    }

    // Create Elasticsearch index
    for rows.Next() {
        var payment Payment
        err := rows.Scan(&payment.ID, &payment.PaymentName, &payment.PaymentInfo)
        if err != nil {
            log.Fatalf("Error scanning rows: %s", err)
        }
        paymentJSON, err := json.Marshal(payment)
        if err != nil {
            log.Fatalf("Error marshaling payment to JSON: %s", err)
        }
        req := esapi.IndexRequest{
            Index:      "payments-index",
            DocumentID: fmt.Sprintf("%d", payment.ID),
            Body:       strings.NewReader(string(paymentJSON)),
            Refresh:    "true",
        }
        res, err := req.Do(context.Background(), es)
        if err != nil {
            log.Fatalf("Error indexing payment: %s", err)
        }
        defer res.Body.Close()
    }

    // Create compound Elasticsearch index
    createIndexReq := esapi.IndicesCreateRequest{
        Index: "payments-compound-index",
        Body:  strings.NewReader(`{"mappings":{"properties":{"id":{"type":"keyword"},"payment_name":{"type":"text"}}}}`),
    }
    createIndexRes, err := createIndexReq.Do(context.Background(), es)
    if err != nil {
        log.Fatalf("Error creating compound index: %s", err)
    }
    defer createIndexRes.Body.Close()

    // Reindex data with the compound index
    reindexReq := esapi.ReindexRequest{
        Body: strings.NewReader(`{"source":{"index":"payments-index"},"dest":{"index":"payments-compound-index"},"script":{"source":"ctx._source.remove('payment_info')"}}`),
    }
    reindexRes, err := reindexReq.Do(context.Background(), es)
    if err != nil {
        log.Fatalf("Error reindexing data: %s", err)
    }
    defer reindexRes.Body.Close()
}

NodeJS

const { Pool } = require('pg');
const { Client } = require('@elastic/elasticsearch');
const pool = new Pool({
  user: 'postgres',
  host: 'localhost',
  database: 'accounts',
  password: 'your_password',
  port: 5432,
});
const client = new Client({ node: 'http://localhost:9200' });

(async () => {
  try {
    // Query the payments table
    const query = 'SELECT id, payment_name, payment_info FROM payments';
    const { rows } = await pool.query(query);

    // Create Elasticsearch index
    for (const row of rows) {
      const { id, payment_name, payment_info } = row;
      await client.index({
        index: 'payments-index',
        id,
        body: { id, payment_name, payment_info },
      });
    }

    // Create compound Elasticsearch index
    const mapping = {
      properties: {
        id: { type: 'keyword' },
        payment_name: { type: 'text' },
      },
    };
    await client.indices.create({
      index: 'payments-compound-index',
      body: { mappings: mapping },
    });

    // Reindex data with the compound index
    await client.reindex({
      wait_for_completion: true,
      body: {
        source: { index: 'payments-index' },
        dest: { index: 'payments-compound-index' },
        script: { source: 'ctx._source.remove("payment_info")' },
      },
    });
  } catch (error) {
    console.error(error);
  } finally {
    await pool.end();
    await client.close();
  }
})();

Java

import java.sql.*;
import java.util.*;

import org.elasticsearch.client.*;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.reindex.ReindexRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

public class Main {

    public static void main(String[] args) throws Exception {
        // Connect to PostgreSQL
        Connection conn = DriverManager.getConnection(
            "jdbc:postgresql://localhost:5432/accounts",
            "postgres",
            "your_password"
        );
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery("SELECT id, payment_name, payment_info FROM payments");
        List<Map<String, Object>> payments = new ArrayList<>();
        while (rs.next()) {
            Map<String, Object> payment = new HashMap<>();
            payment.put("id", rs.getLong("id"));
            payment.put("payment_name", rs.getString("payment_name"));
            payment.put("payment_info", rs.getString("payment_info"));
            payments.add(payment);
        }

        // Connect to Elasticsearch
        RestHighLevelClient esClient = new RestHighLevelClient(
            RestClient.builder(new HttpHost("localhost", 9200, "http"))
        );

        // Create Elasticsearch index
        for (Map<String, Object> payment : payments) {
            IndexRequest indexReq = new IndexRequest("payments-index")
                .id(payment.get("id").toString())
                .source(payment, XContentType.JSON)
                .setRefreshPolicy(RefreshPolicy.IMMEDIATE);
            esClient.index(indexReq);
        }

        // Create compound Elasticsearch index
        CreateIndexRequest createIndexReq = new CreateIndexRequest("payments-compound-index")
            .mapping("{\n" +
                "  \"properties\": {\n"
                "    \"id\": {\n" +
                "      \"type\": \"keyword\"\n" +
                "    },\n" +
                "    \"payment_name\": {\n" +
                "      \"type\": \"text\"\n" +
                "    }\n" +
                "  }\n" +
                "}", XContentType.JSON);
        esClient.indices().create(createIndexReq);

        // Reindex data with the compound index
        ReindexRequest reindexReq = new ReindexRequest()
            .setSource("payments-index")
            .setDest("payments-compound-index")
            .setScript(new Script("ctx._source.remove('payment_info')"));
        esClient.reindex(reindexReq);

        // Close PostgreSQL and Elasticsearch connections
        rs.close();
        stmt.close();
        conn.close();
        esClient.close();
    }
}

C#

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using Elasticsearch.Net;
using Nest;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            // Connect to PostgreSQL
            var connectionString = "User ID=postgres;Password=your_password;Host=localhost;Port=5432;Database=accounts;";
            using var conn = new NpgsqlConnection(connectionString);
            conn.Open();
            using var cmd = new NpgsqlCommand("SELECT id, payment_name, payment_info FROM payments", conn);
            using var reader = cmd.ExecuteReader();
            var payments = new List<Dictionary<string, object>>();
            while (reader.Read())
            {
                var payment = new Dictionary<string, object>
                {
                    { "id", reader.GetInt64(0) },
                    { "payment_name", reader.GetString(1) },
                    { "payment_info", reader.GetString(2) }
                };
                payments.Add(payment);
            }

            // Connect to Elasticsearch
            var node = new Uri("http://localhost:9200");
            var settings = new ConnectionSettings(node).DefaultIndex("default");
            var esClient = new ElasticClient(settings);

            // Create Elasticsearch index
            foreach (var payment in payments)
            {
                var indexReq = new IndexRequest<Dictionary<string, object>>("payments-index", payment["id"].ToString())
                {
                    Document = payment,
                    Refresh = Refresh.True
                };
                esClient.Index(indexReq);
            }

            // Create compound Elasticsearch index
            var createIndexReq = new CreateIndexRequest("payments-compound-index")
                .Mappings(ms => ms
                    .Map<Dictionary<string, object>>(m => m
                        .Properties(ps => ps
                            .Keyword(k => k.Name("id"))
                            .Text(t => t.Name("payment_name"))
                        )
                    )
                );
            esClient.Indices.Create(createIndexReq);

            // Reindex data with the compound index
            var reindexReq = new ReindexRequest("payments-index", "payments-compound-index")
            {
                Script = new InlineScript("ctx._source.remove('payment_info')")
            };
            esClient.Reindex(reindexReq);

            // Close PostgreSQL and Elasticsearch connections
            reader.Close();
            conn.Close();
        }
    }
}
> Elasticsearch for Beginners
An overview of Elasticsearch, its features, benefits, and how to get started with Elasticsearch
> Advanced Elasticsearch
Let’s talk about Elasticsearch and some of its advanced tools that tap into its powerful features.
> Installing Elasticsearch
I’ll walk you through the steps to install Elasticsearch on different operating systems