Message Queue Consumers

Overview

With SonicBase you can consume messages from an external data source. This is very useful where SonicBase is used as an analytics database. The current messaging platforms that are supported include Kafka and Amazon Web Services Message Queue Service. Additionally, we support custom message providers. To initiate an upsert or a delete you will publish a message to your queue in the format specified below.

Configuration

You configure message queue integration in the cluster config file. In the configuration you create a "queue" section where you specify an array of "consumer"s. Each consumer has a "className", "threadCount" and provider specific config.

Example


    {
        "dataDirectory": "$HOME/db-data",
        "installDirectory": "$HOME/sonicbase",
        "compressRecords": false,
        "useUnsafe": true,
        "maxJavaHeap": "60%",
        "user": "ubuntu",
        "clientIsPrivate": false,
        "queue": {
            "consumers": [
                {
                    "className": "com.sonicbase.queue.AWSSQSMessageQueueConsumer",
                    "threadCount": 128,
                    "url": "https://sqs.us-east-1.amazonaws.com/892217715566/benchmark-queue2"
                },
                {
                    "className": "com.sonicbase.queue.KafkaMessageQueueConsumer",
                    "threadCount": 16,
                    "servers: "localhost:9092"
                    "topic: "test"
                }
            ]
        }
    }
            

Message Format

Header

The outermost part of the json must contain the following fields:
  • database - the name of the database for the affected records
  • table - the name of the table to for the affected records
  • action - the action to take on the records. The action may ether be "upsert" or "delete"

  • The "records" section contains an array of records to act on. For "upsert", the records contain the fields you want to store on the record. For "delete", the record contains the fields you want the candidate record to match.

    Binary fields must be base64 encoded.

    Example Message

    
        {
            "database": "test",
            "table": "persons",
            "action": "upsert",
            "records": [
                {
                    "id": 123,
                    "name": "bob"
                },
                {
                    "id": 124,
                    "name": "sue"
                },
            ]
        }
                

    Administration

    In order for AWS integration to work you must have a file named "<cluster>-awskeys" located in the "keys" subdirectory of the install directory. This file contains the accesskey on the first line and the secret key on the second line.

    Start Consuming

    When the cluster starts, consuming will automatically start if consumers are configured. To start consuming after stopping it, type "start queue consumers" in the admin client.

    Stop Consuming

    In the admin client type "stop queue consumers" to stop consuming across the cluster.

    Custom Message Providers

    You can hook into any messaging system by providing a custom provider. Your class must implement the following interface located in sonicbase-<version>.jar.

    
        public interface MessageQueueConsumer {
    void init(String cluster, String jsonConfig, String jsonQueueConfig);
    List<Message> receive();
    void acknowledgeMessage(Message message);
    void shutdown();
    }
    "init" is called once per thread to give you a chance to create your thread-specific consumer.

    "receive" is called for you to return messages from your provider.

    "acknowledgeMessage" is called for you to acknowledge the message with your provider if applicable.

    "shutdown" is called for you to disconnect from your provider.

    The supporting class "Message" is shown below. You may create a derived class from Message to store other message details specific to your provider (like receipt).

    
        public class Message {
            private String body;
    public Message() { }
    public Message(String body) { this.body = body; }
    public void setBody(String body) { this.body = body; }
    public String getBody() { return body; } }

    Example Custom Provider

    Below is an example provider that is based on the Amazon Web Services Simple Queue Service provider built into SonicBase. For your provider to work, you must place the jar containing your code and any dependencies in the "lib" directory under the install directory and deploy it across the cluster.
    
    package com.sonicbase.queue;
    public class AWSSQSMessageQueueConsumer implements MessageQueueConsumer { private String url; private AmazonSQSClient sqsClient; private boolean shutdown; class AWSMessage extends com.sonicbase.queue.Message { private final com.amazonaws.services.sqs.model.Message message; private com.amazonaws.services.sqs.model.Message awsMessage;
    public AWSMessage(com.amazonaws.services.sqs.model.Message message, String body) { super(body); this.message = message; } } public File getInstallDir(JsonDict config) { String dir = config.getString("installDirectory"); return new File(dir.replace("$HOME", System.getProperty("user.home"))); }
    @Override public void shutdown() { this.shutdown = true; }
    @Override public void init(String cluster, String jsonConfig, String jsonQueueConfig) { final ClientConfiguration clientConfig = new ClientConfiguration(); clientConfig.setMaxConnections(10); clientConfig.setRequestTimeout(20_000); clientConfig.setConnectionTimeout(60_000);
    JsonDict config = new JsonDict(jsonConfig); File installDir = getInstallDir(config); File keysFile = new File(installDir, "/keys/" + cluster + "-awskeys"); if (!keysFile.exists()) { throw new DatabaseException(cluster + "-awskeys file not found"); } try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(keysFile)))) { String accessKey = reader.readLine(); String secretKey = reader.readLine();
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); sqsClient = new AmazonSQSClient(awsCredentials, clientConfig); } catch (IOException e) { throw new DatabaseException(e); } JsonDict queueConfig = new JsonDict(jsonQueueConfig); url = queueConfig.getString("url"); }
    @Override public List<com.sonicbase.queue.Message> receive() { ReceiveMessageRequest request = new ReceiveMessageRequest(url); request.setMaxNumberOfMessages(10); request.setWaitTimeSeconds(10); ReceiveMessageResult receivedMessages = sqsClient.receiveMessage(request.withMessageAttributeNames("All"));
    List<com.amazonaws.services.sqs.model.Message> innerMessages = receivedMessages.getMessages(); List<com.sonicbase.queue.Message> resultMessages = new ArrayList<>(); for (com.amazonaws.services.sqs.model.Message message : innerMessages) { resultMessages.add(new AWSMessage(message, message.getBody())); } return resultMessages; }
    @Override public void acknowledgeMessage(com.sonicbase.queue.Message message) { sqsClient.deleteMessage(url, ((AWSMessage)message).message.getReceiptHandle()); } }