Migrate PostgreSQL Table to Elasticsearch
This article demonstrates how to migrate a PostgresSQL table to Elasticsearch using a compound index to ensure uniqueness. Examples are in Python, Golang, NodeJS, Java and C#.
Note that you will need to replace the PostgreSQL connection parameters and Elasticsearch parameters with your own. Also, make sure to install the necessary dependencies before running each script.
Python – Elasticsearch library
Python script that queries a PostgreSQL database called accounts
and its table payments
, retrieves the data and creates an Elasticsearch index with the data. The Elasticsearch index is created with the id
and payment_name
fields as a compound index and uses the elasticsearch
library.
Note that you will need to replace the PostgreSQL connection parameters and Elasticsearch parameters with your own. Also, make sure to install the psycopg2
and elasticsearch
Python packages before running this script.
from elasticsearch import Elasticsearch import psycopg2 # Connect to PostgreSQL conn = psycopg2.connect( dbname="accounts", user="postgres", host="localhost", password="your_password" ) cur = conn.cursor() # Query the payments table cur.execute("SELECT id, payment_name, payment_info FROM payments") rows = cur.fetchall() # Connect to Elasticsearch es = Elasticsearch() # Create Elasticsearch index for row in rows: es.index( index="payments-index", id=row[0], body={ "id": row[0], "payment_name": row[1], "payment_info": row[2] } ) # Create compound Elasticsearch index es.indices.create(index='payments-compound-index', body={ 'mappings': { 'properties': { 'id': {'type': 'keyword'}, 'payment_name': {'type': 'text'} } } }) # Re-index the data with the compound index es.reindex( body={ "source": { "index": "payments-index" }, "dest": { "index": "payments-compound-index" }, "script": { "source": "ctx._source.remove('payment_info')" } } ) # Close PostgreSQL connection cur.close() conn.close()
Python – Built-in libraries
This is the same script using only Python built-in libraries. Note that this script assumes that the PostgreSQL server is running on the same machine as the script and listening on the default port, and that the Elasticsearch server is running on the same machine and listening on port 9200. You will need to replace the connection parameters with your own as necessary.
import json import socket import sys import psycopg2 def create_payments_index(cur, es): cur.execute("SELECT id, payment_name, payment_info FROM payments") rows = cur.fetchall() for row in rows: payment = {"id": row[0], "payment_name": row[1], "payment_info": row[2]} es.index("payments-index", payment, id=row[0]) def create_compound_index(es): index_mapping = { "mappings": { "properties": { "id": {"type": "keyword"}, "payment_name": {"type": "text"}, } } } es.indices.create(index="payments-compound-index", body=index_mapping) def reindex_data(es): reindexing_query = { "source": {"index": "payments-index"}, "dest": {"index": "payments-compound-index"}, "script": {"source": "ctx._source.remove('payment_info')"}, } reindexing_response = es.reindex(reindexing_query) if reindexing_response.get("failures"): raise Exception("Error reindexing data") def connect_to_postgresql(): try: conn = psycopg2.connect( dbname="accounts", user="postgres", host="localhost", password="your_password", ) return conn.cursor() except psycopg2.Error as e: print("Unable to connect to the database: ", e) sys.exit(1) def connect_to_elasticsearch(): es = socket.socket(socket.AF_INET, socket.SOCK_STREAM) es.connect(("localhost", 9200)) return es if __name__ == "__main__": cur = connect_to_postgresql() es = connect_to_elasticsearch() create_payments_index(cur, es) create_compound_index(es) reindex_data(es) cur.close() es.close()
Golang
package main import ( "database/sql" "encoding/json" "fmt" "log" "net/http" "strings" "github.com/elastic/go-elasticsearch/v8" _ "github.com/lib/pq" ) type Payment struct { ID int64 `json:"id"` PaymentName string `json:"payment_name"` PaymentInfo string `json:"payment_info"` } func main() { // Connect to PostgreSQL db, err := sql.Open("postgres", "user=postgres password=your_password dbname=accounts sslmode=disable") if err != nil { log.Fatalf("Unable to connect to the database: %s", err) } defer db.Close() // Query the payments table rows, err := db.Query("SELECT id, payment_name, payment_info FROM payments") if err != nil { log.Fatalf("Unable to query the database: %s", err) } defer rows.Close() // Connect to Elasticsearch es, err := elasticsearch.NewDefaultClient() if err != nil { log.Fatalf("Error creating the Elasticsearch client: %s", err) } // Create Elasticsearch index for rows.Next() { var payment Payment err := rows.Scan(&payment.ID, &payment.PaymentName, &payment.PaymentInfo) if err != nil { log.Fatalf("Error scanning rows: %s", err) } paymentJSON, err := json.Marshal(payment) if err != nil { log.Fatalf("Error marshaling payment to JSON: %s", err) } req := esapi.IndexRequest{ Index: "payments-index", DocumentID: fmt.Sprintf("%d", payment.ID), Body: strings.NewReader(string(paymentJSON)), Refresh: "true", } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("Error indexing payment: %s", err) } defer res.Body.Close() } // Create compound Elasticsearch index createIndexReq := esapi.IndicesCreateRequest{ Index: "payments-compound-index", Body: strings.NewReader(`{"mappings":{"properties":{"id":{"type":"keyword"},"payment_name":{"type":"text"}}}}`), } createIndexRes, err := createIndexReq.Do(context.Background(), es) if err != nil { log.Fatalf("Error creating compound index: %s", err) } defer createIndexRes.Body.Close() // Reindex data with the compound index reindexReq := esapi.ReindexRequest{ Body: strings.NewReader(`{"source":{"index":"payments-index"},"dest":{"index":"payments-compound-index"},"script":{"source":"ctx._source.remove('payment_info')"}}`), } reindexRes, err := reindexReq.Do(context.Background(), es) if err != nil { log.Fatalf("Error reindexing data: %s", err) } defer reindexRes.Body.Close() }
NodeJS
const { Pool } = require('pg'); const { Client } = require('@elastic/elasticsearch'); const pool = new Pool({ user: 'postgres', host: 'localhost', database: 'accounts', password: 'your_password', port: 5432, }); const client = new Client({ node: 'http://localhost:9200' }); (async () => { try { // Query the payments table const query = 'SELECT id, payment_name, payment_info FROM payments'; const { rows } = await pool.query(query); // Create Elasticsearch index for (const row of rows) { const { id, payment_name, payment_info } = row; await client.index({ index: 'payments-index', id, body: { id, payment_name, payment_info }, }); } // Create compound Elasticsearch index const mapping = { properties: { id: { type: 'keyword' }, payment_name: { type: 'text' }, }, }; await client.indices.create({ index: 'payments-compound-index', body: { mappings: mapping }, }); // Reindex data with the compound index await client.reindex({ wait_for_completion: true, body: { source: { index: 'payments-index' }, dest: { index: 'payments-compound-index' }, script: { source: 'ctx._source.remove("payment_info")' }, }, }); } catch (error) { console.error(error); } finally { await pool.end(); await client.close(); } })();
Java
import java.sql.*; import java.util.*; import org.elasticsearch.client.*; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.reindex.ReindexRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; public class Main { public static void main(String[] args) throws Exception { // Connect to PostgreSQL Connection conn = DriverManager.getConnection( "jdbc:postgresql://localhost:5432/accounts", "postgres", "your_password" ); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery("SELECT id, payment_name, payment_info FROM payments"); List<Map<String, Object>> payments = new ArrayList<>(); while (rs.next()) { Map<String, Object> payment = new HashMap<>(); payment.put("id", rs.getLong("id")); payment.put("payment_name", rs.getString("payment_name")); payment.put("payment_info", rs.getString("payment_info")); payments.add(payment); } // Connect to Elasticsearch RestHighLevelClient esClient = new RestHighLevelClient( RestClient.builder(new HttpHost("localhost", 9200, "http")) ); // Create Elasticsearch index for (Map<String, Object> payment : payments) { IndexRequest indexReq = new IndexRequest("payments-index") .id(payment.get("id").toString()) .source(payment, XContentType.JSON) .setRefreshPolicy(RefreshPolicy.IMMEDIATE); esClient.index(indexReq); } // Create compound Elasticsearch index CreateIndexRequest createIndexReq = new CreateIndexRequest("payments-compound-index") .mapping("{\n" + " \"properties\": {\n" " \"id\": {\n" + " \"type\": \"keyword\"\n" + " },\n" + " \"payment_name\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + "}", XContentType.JSON); esClient.indices().create(createIndexReq); // Reindex data with the compound index ReindexRequest reindexReq = new ReindexRequest() .setSource("payments-index") .setDest("payments-compound-index") .setScript(new Script("ctx._source.remove('payment_info')")); esClient.reindex(reindexReq); // Close PostgreSQL and Elasticsearch connections rs.close(); stmt.close(); conn.close(); esClient.close(); } }
C#
using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using Elasticsearch.Net; using Nest; namespace ConsoleApp1 { class Program { static void Main(string[] args) { // Connect to PostgreSQL var connectionString = "User ID=postgres;Password=your_password;Host=localhost;Port=5432;Database=accounts;"; using var conn = new NpgsqlConnection(connectionString); conn.Open(); using var cmd = new NpgsqlCommand("SELECT id, payment_name, payment_info FROM payments", conn); using var reader = cmd.ExecuteReader(); var payments = new List<Dictionary<string, object>>(); while (reader.Read()) { var payment = new Dictionary<string, object> { { "id", reader.GetInt64(0) }, { "payment_name", reader.GetString(1) }, { "payment_info", reader.GetString(2) } }; payments.Add(payment); } // Connect to Elasticsearch var node = new Uri("http://localhost:9200"); var settings = new ConnectionSettings(node).DefaultIndex("default"); var esClient = new ElasticClient(settings); // Create Elasticsearch index foreach (var payment in payments) { var indexReq = new IndexRequest<Dictionary<string, object>>("payments-index", payment["id"].ToString()) { Document = payment, Refresh = Refresh.True }; esClient.Index(indexReq); } // Create compound Elasticsearch index var createIndexReq = new CreateIndexRequest("payments-compound-index") .Mappings(ms => ms .Map<Dictionary<string, object>>(m => m .Properties(ps => ps .Keyword(k => k.Name("id")) .Text(t => t.Name("payment_name")) ) ) ); esClient.Indices.Create(createIndexReq); // Reindex data with the compound index var reindexReq = new ReindexRequest("payments-index", "payments-compound-index") { Script = new InlineScript("ctx._source.remove('payment_info')") }; esClient.Reindex(reindexReq); // Close PostgreSQL and Elasticsearch connections reader.Close(); conn.Close(); } } }
> Elasticsearch for Beginners |
An overview of Elasticsearch, its features, benefits, and how to get started with Elasticsearch |
> Advanced Elasticsearch |
Let’s talk about Elasticsearch and some of its advanced tools that tap into its powerful features. |
> Installing Elasticsearch |
I’ll walk you through the steps to install Elasticsearch on different operating systems |