Message Queue Producers

Overview

With SonicBase you can publish messages from inserts, updates and deletes that occur to the database. The current messaging platforms that are supported include Kafka and Amazon Web Services Message Queue Service. Additionally, we support custom message providers. Your queue will receive messages from SonicBase in the format described below.

Performance Considerations

You must have an extremely fast message queue provider when hooking a producer into Sonicbase. Otherwise performance will greatly suffer.

Configuration

You configure message queue integration in the cluster config file. In the configuration you create a "queue" section where you specify an array of "producer"s. Each producer has a "className", "maxBatchSize" and provider specific config.

Example


    {
        "dataDirectory": "$HOME/db-data",
        "installDirectory": "$HOME/sonicbase",
        "compressRecords": false,
        "useUnsafe": true,
        "maxJavaHeap": "60%",
        "user": "ubuntu",
        "clientIsPrivate": false,
        "queue": {
            "producers" : [ {
                "maxBatchSize" : 100,
                "className" : "com.sonicbase.queue.AWSSQSMessageQueueProducer",
                "url" : "https://sqs.us-east-1.amazonaws.com/892217754330/benchmark-queue"
            },
            {
                "servers" : "10.0.0.231:9092",
                "maxBatchSize" : 100,
                "topic" : "test",
                "className" : "com.sonicbase.queue.KafkaMessageQueueProducer"
            } ]
        }
    }
            

Message Format

Header

The outermost part of the json must contain the following fields:
  • database - the name of the database for the affected records
  • table - the name of the table to for the affected records
  • action - the action that triggered the publish of the message. The action may be "upsert" or "delete".

  • The "records" section contains an array of records that are being publishee.

    Binary fields are base64 encoded.

    Example Message

    
        {
            "database": "test",
            "table": "persons",
            "action": "upsert",
            "records": [
                {
                    "id": 123,
                    "name": "bob"
                },
                {
                    "id": 124,
                    "name": "sue"
                },
            ]
        }
                

    Administration

    In order for AWS integration to work you must have a file named "<cluster>-awskeys" located in the "keys" subdirectory of the install directory. This file contains the accesskey on the first line and the secret key on the second line.

    Custom Message Providers

    You can hook into any messaging system by providing a custom provider. Your class must implement the following interface located in sonicbase-<version>.jar.

    
        package com.sonicbase.queue;
    public interface MessageQueueProducer {
    void init(String cluster, String jsonConfig, String jsonQueueConfig);
    void publish(String message);
    void shutdown();
    ` }
    "init" is called once per thread to give you a chance to create your thread-specific producer.

    "publish" is called for you send a message to your message queue.

    "shutdown" is called for you to disconnect from your provider.

    Example Custom Provider

    Below is an example provider that is based on the Kafka provider built into SonicBase. For your provider to work, you must place the jar containing your code and any dependencies in the "lib" directory under the install directory and deploy it across the cluster.
    
    package com.sonicbase.queue;
    public class KafkaMessageQueueProducer implements MessageQueueProducer { private String topic; private Producer<String, String> producer;
    @Override public void init(String cluster, String jsonConfig, String jsonQueueConfig) {
    JsonDict queueConfig = new JsonDict(jsonQueueConfig); String servers = queueConfig.getString("servers"); this.topic = queueConfig.getString("topic");
    Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<>(props); }
    @Override public void publish(String message) { try { Future<RecordMetadata> response = producer.send(new ProducerRecord<>(topic, "message", message)); response.get(); } catch (Exception e) { throw new DatabaseException(e); } }
    @Override public void shutdown() { } }