I am using logstash jdbc to keep the things syncd between mysql and elasticsearch. Its working fine for one table. But now I want to do it for multiple tables. Do I need to open multiple in terminal
logstash  agent -f /Users/logstash/logstash-jdbc.conf 
each with a select query or do we have a better way of doing it so we can have multiple tables being updated.
my config file
input {
  jdbc {
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from table1"
  }
}
output {
    elasticsearch {
        index => "testdb"
        document_type => "table1"
        document_id => "%{table_id}"
        hosts => "localhost:9200"
    }
}
Only use input once.
id>:sql_last_value will check for the unique last id read from database, assuming you are tracking data using your field = id. The above configuration file reads your SQL database every minute and updates the data in ElasticSearch.
Logstash is an open source data processing pipeline that ingests events from one or more inputs, transforms them, and then sends each event to one or more outputs. Some Logstash implementations may have many lines of code and may process events from multiple input sources.
Each time Logstash polls MySQL, it stores the update or insertion time of the last record that it has read from MySQL.
You can definitely have a single config with multiple jdbc input and then parametrize the index and document_type in your elasticsearch output depending on which table the event is coming from.
input {
  jdbc {
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from table1"
    type => "table1"
  }
  jdbc {
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *"
    statement => "select * from table2"
    type => "table2"
  }
  # add more jdbc inputs to suit your needs 
}
output {
    elasticsearch {
        index => "testdb"
        document_type => "%{type}"   # <- use the type from each input
        hosts => "localhost:9200"
    }
}
This will not create duplicate data. and compatible logstash 6x.
# YOUR_DATABASE_NAME : test
# FIRST_TABLE :  place  
# SECOND_TABLE :  things    
# SET_DATA_INDEX : test_index_1, test_index_2
input {
    jdbc {
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        # The user we wish to execute our statement as
        jdbc_user => "root"
        jdbc_password => ""
        schedule => "* * * * *"
        statement => "SELECT  @slno:=@slno+1 aut_es_1, es_qry_tbl.* FROM (SELECT * FROM `place`) es_qry_tbl, (SELECT @slno:=0) es_tbl"
        type => "place"
        add_field => { "queryFunctionName" => "getAllDataFromFirstTable" }
        use_column_value => true
        tracking_column => "aut_es_1"
    }
    jdbc {
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
        # The user we wish to execute our statement as
        jdbc_user => "root"
        jdbc_password => ""
        schedule => "* * * * *"
        statement => "SELECT  @slno:=@slno+1 aut_es_2, es_qry_tbl.* FROM (SELECT * FROM `things`) es_qry_tbl, (SELECT @slno:=0) es_tbl"
        type => "things"
        add_field => { "queryFunctionName" => "getAllDataFromSecondTable" }
        use_column_value => true
        tracking_column => "aut_es_2"
    } 
}
# install uuid plugin 'bin/logstash-plugin install logstash-filter-uuid'
# The uuid filter allows you to generate a UUID and add it as a field to each processed event.
filter {
    mutate {
            add_field => {
                    "[@metadata][document_id]" => "%{aut_es_1}%{aut_es_2}"
            }
    }
    uuid {
        target    => "uuid"
        overwrite => true
    }    
}
output {
    stdout {codec => rubydebug}
    if [type] == "place" {
        elasticsearch {
            hosts => "localhost:9200"
            index => "test_index_1_12"
            #document_id => "%{aut_es_1}"
            document_id => "%{[@metadata][document_id]}"
        }
    }
    if [type] == "things" {
        elasticsearch {
            hosts => "localhost:9200"
            index => "test_index_2_13"
            document_id => "%{[@metadata][document_id]}"
            # document_id => "%{aut_es_2}"
            # you can set document_id . otherwise ES will genrate unique id. 
        }
    }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With