SonicBase. In-memory embedded or distributed streaming sql database

Stored Procedures - Example 2

Overview

This example illustrates how you can store results in a temporary table and allow the client to traverse this table for the results. Here we capture the result records in the "RecordEvaluator" callback. Additionally we return "false" from the RecordEvaluator to prevent the results from being returned to the stored procedure via the result set of the query. Note that we return the table name to the client in the "finalize" method.

It is important to call SonicBasePreparedStatement.restrictToThisServer(true) before executing the query. This keeps traversal on this server.

Client Code


  @Test
  public void test2() throws SQLException {
      String tableName = null;
      try {
        String query = "call procedure 'com.sonicbase.procedure.MyStoredProcedure2', 1000";
        try (PreparedStatement procedureStmt = conn.prepareStatement(query);
             ResultSet procedureRs = procedureStmt.executeQuery()) {
          if (procedureRs.next()) {
            tableName = procedureRs.getString("tableName");
          }
          System.out.println("tableName=" + tableName);
        }

        try (PreparedStatement resultsStmt = conn.prepareStatement("select * from " + tableName);
            ResultSet rs = resultsStmt.executeQuery()) {
          int offset = 3;
          while (rs.next()) {
            System.out.println("id=" + rs.getLong("id1") + ", socialsecuritynumber=" +
                rs.getString("socialsecuritynumber") + ", gender=" + rs.getString("gender"));
          }
        }
        System.out.println("finished");
      }
      finally {
        try {
          if (tableName != null) {
            try (PreparedStatement stmt = conn.prepareStatement("drop table " + tableName)) {
              stmt.executeUpdate();
            }
          }
        }
        catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

Stored Procedure Code


public class MyStoredProcedure2 implements StoredProcedure {

  public void init(StoredProcedureContext context) {
    try {
      try (SonicBasePreparedStatement stmt = context.getConnection().prepareSonicBaseStatement(
          context, "create table " + "results_" + context.getStoredProdecureId() + " (id1 BIGINT, num DOUBLE, socialSecurityNumber " +
              "VARCHAR(20), gender VARCHAR(8), PRIMARY KEY (id1))")) {
        stmt.executeUpdate();
      }
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
  }

  @Override
  public StoredProcedureResponse execute(final StoredProcedureContext context) {
    try {
      String query = "select * from persons where id1>1 and id1<500 and gender='m'";

      try (SonicBasePreparedStatement stmt = context.getConnection().prepareSonicBaseStatement(context, query)) {
        stmt.restrictToThisServer(true);

        final List<Record> batch = new ArrayList<>();
        stmt.executeQueryWithEvaluator(new RecordEvaluator() {
          @Override
          public boolean evaluate(final StoredProcedureContext context, Record record) {
            if (record.getDatabase().equalsIgnoreCase("db") &&
                record.getTableName().equalsIgnoreCase("persons")) {
              Long id = record.getLong("id1");
              if (id != null && id > 2 && id < context.getParameters().getInt(2) && passesComplicatedLogic(record)) {
                if (!record.isDeleting()) {
                  batch.add(record);
                  if (batch.size() >= 200) {
                    insertBatch(context, batch);
                    batch.clear();
                  }
                }
              }
            }
            return false;
          }
        });

        if (batch.size() != 0) {
          insertBatch(context, batch);
        }
      }
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
    return null;
  }

  private boolean passesComplicatedLogic(Record record) {
    //put complicated logic here
    return true;
  }

  public void insertBatch(StoredProcedureContext context, List<Record> batch) {
    try {
      PreparedStatement insertStmt = context.getConnection().prepareStatement("insert into " +
           "results_" + context.getStoredProdecureId() + " (id1, socialsecuritynumber, gender) VALUES (?, ?, ?)");
      for (Record record : batch) {
        insertStmt.setLong(1, record.getLong("id1"));
        insertStmt.setString(2, record.getString("socialsecuritynumber"));
        insertStmt.setString(3, record.getString("gender"));
        insertStmt.addBatch();
      }
      insertStmt.executeBatch();
      insertStmt.close();
    }
    catch (Exception e) {
      throw new DatabaseException(e);
    }
  }

  public StoredProcedureResponse finalize(StoredProcedureContext context,
                                          List<StoredProcedureResponse> responses) {
    Record record = context.createRecord();
    record.setString("tableName", "results_" + context.getStoredProdecureId());

    StoredProcedureResponse response = context.createResponse();
    response.addRecord(record);

    return response;
  }
}