Streaming Consumer

Overview

With SonicBase you can consume messages from an external data source. This is very useful where SonicBase is used as an analytics database. The current streams platforms that are supported include Kafka, Amazon Web Services Kinesis and Amazon Web Services Simple Queue Services. Additionally, we support custom streams providers. To initiate an insert, update or a delete you will publish a message to your stream in the format specified below.

Configuration

You configure streams in the cluster config file. In the configuration you create a "streams" section where you specify an array of "consumer"s. Each consumer has a "className" and provider specific config.

Streams Configuration Examples


{
    "dataDirectory": "$HOME/db-data",
    "installDirectory": "$HOME/sonicbase",
    "compressRecords": false,
    "useUnsafe": true,
    "maxJavaHeap": "60%",
    "user": "ubuntu",
    "clientIsPrivate": false,
    "streams": {
        "processorThreadCount" : 8,
        "consumers": [
        {
            "className": "com.sonicbase.streams.AWSSQSConsumer",
            "threadCount": 1,
            "url": "https://sqs.us-east-1.amazonaws.com/892217715566/benchmark-queue2"
        },
        {
            "className": "com.sonicbase.streams.KafkaConsumer",
            "servers: "localhost:9092"
            "topic: "test"
        },
        {
            "className" : "com.sonicbase.streams.AWSKinesisConsumer",
            "region" : "us-east-1",
            "streamName" : "sonicbase-test-stream",
            "getRecordsSleepMillis" : 200,
            "getRecordsRequestCount" : 1000
        }
        ]
    }
}
            

Message Format


The "events" section contains an array of actions to perform. For "insert", the records contain the fields you want to store on the record. For "update" you specify the record as it appeared before the update (fields you want the candidate record to match) and the record as you want it to appear after the update. For "delete", the record contains the fields you want the candidate record to match.

Binary fields must be base64 encoded.

Example Message


{
    "events": [
        {
            "_sonicbase_dbname": "db",
            "_sonicbase_tablename": "my_table",
            "_sonicbase_action": "insert"
            "id": 123,
            "name": "bob"
        },
        {
            "_sonicbase_dbname": "db",
            "_sonicbase_tablename": "my_table",
            "_sonicbase_action": "update",
            "before": {
                "id": 124,
                "name": "sue"
            },
            "after": {
                "id": 124,
                "name": "new name"
            }
        },
        {
            "_sonicbase_dbname": "db",
            "_sonicbase_tablename": "my_table",
            "_sonicbase_action": "delete"
            "id": 123,
            "name": "bob"
        }
    ]
}
        

Consumer Specifics

AWSSQSConsumer

The message is sent as an attribute named "message". The content of the attribute is a binary gzipped message string.

KafkaConsumer

The message is sent a string message.

AWSKinesisConsumer

The message is sent as a binary gzipped message string.

Administration

In order for AWS integration to work you must have a file named "<cluster>-awskeys" located in the "keys" subdirectory of the install directory. This file contains the accesskey on the first line and the secret key on the second line.

Start Consuming

When the cluster starts, consumers are disabled. To start consuming, type "start streams consumers" in the admin client.

Stop Consuming

In the admin client type "stop streams consumers" to stop consuming across the cluster.

Error Handling

When an error occurs, the message is logged to disk for later processing. The log files are located at <dataDirectory>/stream_errors/<shard>/<replica>/<date>.error.

Additionally, the "handleError" method on your consumer will be called for the messages that got an error.

Custom Streams Providers

You can hook into any streams system by providing a custom provider. Your class must implement the following interface located in sonicbase-<version>.jar.


package com.sonicbase.streams;

public interface StreamsConsumer {
int init(String cluster, String jsonConfig, String jsonStreamConfig);
void initThread();
List<Message> receive();
void acknowledgeMessages(List<Message> messages);
void handleError(List<Message> messages, Exception e);
void shutdown();
}
"init" is called once per consumer. You need to return the number of threads you want created for your consumer.

"initThread" is called once per thread that is owned by your consumer.

"receive" is called for you to return messages from your provider.

"acknowledgeMessages" is called for you to acknowledge the messages with your provider if applicable.

"handleError" is called when there is an exception processing your messages.
"shutdown" is called for you to disconnect from your provider.

The supporting class "Message" is shown below. You may create a derived class from Message to store other message details specific to your provider (like receipt).


public class Message {
    private String body;
public Message() { }
public Message(String body) { this.body = body; }
public void setBody(String body) { this.body = body; }
public String getBody() { return body; } }

Example Custom Provider

Below is an example provider that is based on the Amazon Web Services Simple Queue Service provider built into SonicBase. For your provider to work, you must place the jar containing your code and any dependencies in the "lib" directory under the install directory and deploy it across the cluster.

package com.sonicbase.streams;

public class AWSSQSConsumer implements StreamsConsumer {

  private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(AWSSQSConsumer.class);

  private String url;
  private AmazonSQS sqsClient;
  private boolean shutdown;

  class AWSMessage extends Message {
    private final com.amazonaws.services.sqs.model.Message message;

    public AWSMessage(com.amazonaws.services.sqs.model.Message message, String body) {
      super(body);
      this.message = message;
    }
  }

  public File getInstallDir(ObjectNode config) {
    String dir = config.get("installDirectory").asText();
    return new File(dir.replace("$HOME", System.getProperty("user.home")));
  }

  public void shutdown() {
    this.shutdown = true;
    sqsClient.shutdown();
  }

  @Override
  public int init(String cluster, String jsonConfig, String jsonStreamConfig) {
    try {
      logger.info("aws sqs init - begin");
      final ClientConfiguration clientConfig = new ClientConfiguration();
      clientConfig.setMaxConnections(100);
      clientConfig.setRequestTimeout(20_000);
      clientConfig.setConnectionTimeout(60_000);

      AmazonSQSClientBuilder builder = AmazonSQSClient.builder();

      ObjectMapper mapper = new ObjectMapper();
      ObjectNode config = (ObjectNode) mapper.readTree(jsonConfig);
      File installDir = getInstallDir(config);
      File keysFile = new File(installDir, "/keys/" + cluster + "-awskeys");
      if (!keysFile.exists()) {
        builder.setCredentials(new InstanceProfileCredentialsProvider(true));
      }
      else {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(keysFile)))) {
          String accessKey = reader.readLine();
          String secretKey = reader.readLine();

          BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
          builder.setCredentials(new AWSStaticCredentialsProvider(awsCredentials));
        }
        catch (IOException e) {
          throw new DatabaseException(e);
        }
      }
      builder.setClientConfiguration(clientConfig);
      sqsClient = builder.build();

      ObjectNode streamConfig = (ObjectNode) mapper.readTree(jsonStreamConfig);
      url = streamConfig.get("url").asText();

      logger.info("aws sqs init - end: url=" + url);
      return streamConfig.get("threadCount").asInt();
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
  }

  @Override
  public void initThread() {
  }

  @Override
  public List<Message> receive() {
    try {
      ReceiveMessageRequest request = new ReceiveMessageRequest(url);
      request.setMaxNumberOfMessages(10);
      request.setWaitTimeSeconds(10);
      ReceiveMessageResult receivedMessages = sqsClient.receiveMessage(request.withMessageAttributeNames("All"));

      List<com.amazonaws.services.sqs.model.Message> innerMessages = receivedMessages.getMessages();
      List<Message> resultMessages = new ArrayList<>();
      for (com.amazonaws.services.sqs.model.Message message : innerMessages) {
        ByteBuffer buffer = message.getMessageAttributes().get("message").getBinaryValue();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);

        GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(bytes));
        bytes = IOUtils.toByteArray(in);

        resultMessages.add(new AWSMessage(message, new String(bytes, "utf-8")));
      }
      return resultMessages;
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
  }

  @Override
  public void acknowledgeMessages(List<Message> messages) {
    for (Message message : messages) {
      sqsClient.deleteMessage(url, ((AWSMessage) message).message.getReceiptHandle());
    }
  }

  @Override
  public void handleError(List<Message> messages, Exception e) {
  }
}