SonicBase. In-memory embedded or distributed streaming sql database

Streaming Producer

Overview

With SonicBase you can publish messages from from inserts, updates and deletes that occur to the database. The current streams platforms that are supported include Kafka, Amazon Web Services Kinesis, and Amazon Web Services Simple Message Queue Service. Additionally, we support custom streams providers. Your streams will receive messages from SonicBase in the format described below.

Performance Considerations

You must have an extremely fast streams provider when hooking a producer or consumer into Sonicbase. Otherwise performance will greatly suffer.

Configuration

You configure streams integration in the cluster config file. In the configuration you create a "streams" section where you specify an array of "producer"s. Each producer has a "className", "maxBatchSize" and provider specific config.

Example


dataDirectory: $HOME/db-data
installDirectory: $HOME/sonicbase
      ...
streams:
  processorThreadCount: 8
  producers:
  - producer:
      className: com.sonicbase.streams.AWSKinesisProducer
      region: us-east-1
      streamName: sonicbase-test-stream23
      maxBatchSize: 200
  - producer:
      servers: 10.0.0.167:9092
      maxBatchSize: 200
      topic: test
      kafka.batch.size: 16384
      kafka.linger.ms: 2
      kafka.acks: 1
      className: com.sonicbase.streams.KafkaProducer
  - producer:
      className: com.sonicbase.streams.AWSSQSProducer
      maxBatchSize: 100
      url: https://sqs.us-east-1.amazonaws.com/6/benchmark-queue

Message Format

Administration

In order for AWS integration to work you must have a file named "<cluster>-awskeys" located in the "keys" subdirectory of the install directory. This file contains the accesskey on the first line and the secret key on the second line.

Custom Message Providers

You can hook into any messaging system by providing a custom provider. Your class must implement the following interface located in sonicbase-<version>.jar.


package com.sonicbase.streams;
public interface StreamProducer {
void init(String cluster, String jsonConfig, String jsonStreamConfig);
void publish(List<String> messages);
void shutdown();
}
"init" is called once to initialize your producer.

"publish" is called for you send messages to your streams.

"shutdown" is called for you to disconnect from your provider.

Example Custom Provider

Below is an example provider that is based on the AWS Simple Queue Service provider built into SonicBase. For your provider to work, you must place the jar containing your code and any dependencies in the "lib" directory under the install directory and deploy it across the cluster.

package com.sonicbase.streams;

public class AWSSQSProducer implements StreamsProducer {

  private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(AWSSQSProducer.class);

  private String url;
  private AmazonSQS sqsClient;

  public File getInstallDir(ObjectNode config) {
    String dir = config.get("installDirectory").asText();
    return new File(dir.replace("$HOME", System.getProperty("user.home")));
  }

  @Override
  public void init(String cluster, String jsonConfig, String jsonStreamConfig) {
    try {
      logger.info("aws sqs producer init - begin");
      final ClientConfiguration clientConfig = new ClientConfiguration();
      clientConfig.setMaxConnections(100);
      clientConfig.setRequestTimeout(20_000);
      clientConfig.setConnectionTimeout(60_000);

      AmazonSQSClientBuilder builder = AmazonSQSClient.builder();

      ObjectMapper mapper = new ObjectMapper();
      ObjectNode config = (ObjectNode) mapper.readTree(jsonConfig);
      File installDir = getInstallDir(config);
      File keysFile = new File(installDir, "/keys/" + cluster + "-awskeys");
      if (!keysFile.exists()) {
        builder.setCredentials(new InstanceProfileCredentialsProvider(true));
      }
      else {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(keysFile)))) {
          String accessKey = reader.readLine();
          String secretKey = reader.readLine();

          BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
          builder.setCredentials(new AWSStaticCredentialsProvider(awsCredentials));
        }
        catch (IOException e) {
          throw new DatabaseException(e);
        }
      }
      builder.setClientConfiguration(clientConfig);
      sqsClient = builder.build();

      ObjectNode streamConfig = (ObjectNode) mapper.readTree(jsonStreamConfig);
      url = streamConfig.get("url").asText();

      logger.info("aws sqs producer init - end");
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
  }

  @Override
  public void publish(List<String> messages) {
    try {
      Map<String, MessageAttributeValue> attributes = new HashMap<>();
      List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
      int offset = 0;
      for (String message : messages) {
        SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();

        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
        GZIPOutputStream out = new GZIPOutputStream(bytesOut);
        out.write(message.getBytes("utf-8"));
        out.close();

        entry.setId(String.valueOf(offset));
        entry.setMessageBody("SonicBase Publish");
        MessageAttributeValue attrib = new MessageAttributeValue().withDataType("Binary").withBinaryValue(ByteBuffer.wrap(bytesOut.toByteArray()));
        attributes.put(String.valueOf(offset), attrib);
        entry.addMessageAttributesEntry("message", attrib);
        entries.add(entry);
        offset++;
      }
      SendMessageBatchResult result = sqsClient.sendMessageBatch(url, entries);

      List<BatchResultErrorEntry> failed = result.getFailed();
      for (int i = 0; i < 10 && !failed.isEmpty(); i++) {
        entries.clear();

        logger.error("Error publishing message: count=" + failed.size());

        for (BatchResultErrorEntry curr : failed) {
          SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
          entry.setMessageBody(curr.getMessage());
          String id = curr.getId();
          entry.addMessageAttributesEntry("message", attributes.get(id));
          entries.add(entry);
        }

        result = sqsClient.sendMessageBatch(url, entries);
        failed = result.getFailed();

        if (i == 9 && !failed.isEmpty()) {
          throw new DatabaseException("Error publishing message: count=" + failed.size());
        }
      }
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
  }

  @Override
  public void shutdown() {
    sqsClient.shutdown();
  }
}