IBM Cloud Docs
Creating an IoT historian using partitioned databases

Creating an IoT historian using partitioned databases

Partitioned databases can be difficult to understand in the abstract. In this document you will learn about a concrete use-case to learn how to apply concepts needed to model data for partitioned databases.

We take the Internet of Things domain and look at using IBM Cloudant as a historian for device readings. Say that the devices provide sensor readings on pieces of infrastructure like roads or bridges.

Review the following assumptions:

  • Hundreds or thousands of devices that report readings.
  • Each device has a unique ID.
  • Each piece of infrastructure has a unique ID.
  • Devices aren't moved between pieces of infrastructure.
  • Each device writes a reading to IBM Cloudant every 10 seconds. Likely this reading is delivered by using a message bus to IBM Cloudant.

For this use-case, we will use the device ID for the partition key, and the reading timestamp for the remainder of the document ID:

device-123456:20181211T11:13:24.123456Z

The timestamp could also be an epoch timestamp.

This approach allows the data for each device to be queried efficiently by using partitioned indexes. However, global indexes might need to be used to create views over multiple devices, for example, all devices on a specific piece of infrastructure.

For illustrative purposes, let's make the scenario a bit more complicated. Assume that the application mostly needs to read all sensor data for a specific piece of infrastructure rather than for individual devices.

In this application, you query by infrastructure item to be most efficient, so partitioning the data by piece of infrastructure makes a lot more sense than by ID. This practice would allow all the devices for a specific piece of infrastructure to be efficiently queried as a group.

For the rare queries by device, you use two approaches:

  1. Build a global index that is keyed by device and query it. This approach is more effective if queries to individual devices are rare and not repeated.
  2. Build a global index-mapping device to infrastructure, then issue partition queries to the infrastructure partition. This approach makes sense if repeated queries to specific devices are used as the mapping can be cached. This approach is used for this application.

Let's look at how this approach works out. Let's look at four queries:

  1. Readings for all time for a piece of infrastructure.
  2. Readings for today for a piece of infrastructure.
  3. Readings for all time for a specific device.
  4. Readings for today for a specific device.

Creating the database

To create a partitioned database, pass true as the partitioned argument to the database creation request:

All tutorials in this section will use readings as the example database.

curl -X PUT "$SERVICE_URL/readings?partitioned=true"
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.Ok;
import com.ibm.cloud.cloudant.v1.model.PutDatabaseOptions;

Cloudant service = Cloudant.newInstance();

PutDatabaseOptions databaseOptions = new PutDatabaseOptions.Builder()
.db("readings")
.partitioned(true)
.build();

Ok response =
service.putDatabase(databaseOptions).execute()
.getResult();

System.out.println(response);
const { CloudantV1 } = require('@ibm-cloud/cloudant');

const service = CloudantV1.newInstance({});

service.putDatabase({
  db: 'readings',
  partitioned: true
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1

service = CloudantV1.new_instance()

response = service.put_database(db='readings', partitioned=True).get_result()

print(response)
putDatabaseOptions := service.NewPutDatabaseOptions(
"readings",
)
putDatabaseOptions.SetPartitioned(true)

ok, response, err := service.PutDatabase(putDatabaseOptions)
if err != nil {
panic(err)
}

b, _ := json.MarshalIndent(ok, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.

Document structure

First, let's define a simple document format to work with:

{
  "deviceID": "device-123456",
  "infrastructureID": "bridge-9876",
  "ts": "20181211T11:13:24.123456Z",
  "reading": {
    "temperature": {
      "value": 12,
      "unit": "c"
    }
  }
}

This document uses the partitioning scheme based on a piece of infrastructure. The document ID might include the infrastructure ID as the partition key, and include both device and timestamp as the document key:

bridge-9876:device-123456-20181211T11:13:24.123456Z

Creating indexes

For the queries described previously, you need two indexes:

  1. A global index-mapping device ID to infrastructure ID
  2. A partitioned index-mapping device ID to reading

Creating a global view index

A view index is the most efficient way to do the simple device ID to infrastructure ID mapping. To define it, upload a design document with options.partitioned set to false as this index is global. While in a real map function you'd want to be more defensive around field existence, this document would look something like this:

{
  "options": {
    "partitioned": false
  },
  "views": {
    "by-device": {
      "map": "function(doc) { emit(doc.deviceID, doc.infrastructureID) }"
    }
  }
}

Assuming the previous document in ./view.json, this document is uploaded to the database by using the following command:

curl -X PUT "$SERVICE_URL/readings/_design/infrastructure-mapping" -H 'Content-Type: application/json' --data @view.json

For more language examples that show creating a global view, see the Storing the view definition guide, or the Create or modify a design document section in API Docs.

Creating a partitioned IBM Cloudant Query index

To return the readings for a specific device from a partition, you can use an IBM Cloudant Query index. For this document, use POST to _index with an index definition that includes the partitioned field set to true.

For Query index definitions, the partitioned field isn't nested inside an options object.

For these queries, you need two partitioned indexes:

  1. By timestamp
  2. By device ID and timestamp

Uploading partitioned index by timestamp

Upload the index by timestamp to the database by using this command:

curl -X POST "$SERVICE_URL/readings/_index" -H 'Content-Type: application/json' --data '{
   "index": {
      "fields": [
         {"ts": "asc"}
      ]
   },
   "name": "timestamped-readings",
   "type": "json",
   "partitioned": true
}'
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.IndexDefinition;
import com.ibm.cloud.cloudant.v1.model.IndexField;
import com.ibm.cloud.cloudant.v1.model.IndexResult;
import com.ibm.cloud.cloudant.v1.model.PostIndexOptions;

Cloudant service = Cloudant.newInstance();

IndexField indexField = new IndexField.Builder()
    .add("ts", "asc")
    .build();

IndexDefinition index = new IndexDefinition.Builder()
    .addFields(indexField)
    .build();

PostIndexOptions indexOptions = new PostIndexOptions.Builder()
    .db("readings")
    .index(index)
    .name("timestamped-readings")
    .type("json")
    .partitioned(true)
    .build();

IndexResult response =
    service.postIndex(indexOptions).execute()
        .getResult();

System.out.println(response);
const { CloudantV1 } = require('@ibm-cloud/cloudant');

const service = CloudantV1.newInstance({});

const indexField = {
  ts: 'asc'
}

const index = {
  fields: [indexField]
}

service.postIndex({
  db: 'readings',
  name: 'timestamped-readings',
  index: index,
  type: 'json',
  partitioned: true
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1, IndexDefinition, IndexField

service = CloudantV1.new_instance()

index_field = IndexField(ts='asc')

index = IndexDefinition(
    fields=[index_field]
)

response = service.post_index(
    db='readings',
    name='timestamped-readings',
    index=index,
    type='json',
    partitioned=True
).get_result()

print(response)
var indexField cloudantv1.IndexField
indexField.SetProperty("ts", core.StringPtr("asc"))

postIndexOptions := service.NewPostIndexOptions(
  "readings",
  &cloudantv1.IndexDefinition{
    Fields: []cloudantv1.IndexField{
      indexField,
    },
  },
)
postIndexOptions.SetName("timestamped-readings")
postIndexOptions.SetType("json")
postIndexOptions.SetPartitioned(true)

indexResult, response, err := service.PostIndex(postIndexOptions)
if err != nil {
  panic(err)
}

b, _ := json.MarshalIndent(indexResult, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
   "github.com/IBM/go-sdk-core/v5/core"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.

Uploading partitioned index by device ID and timestamp

Upload the index by device ID and timestamp to the database by using this command:

curl -X POST "$SERVICE_URL/readings/_index" -H 'Content-Type: application/json' --data '{
   "index": {
      "fields": [
         {"deviceID": "asc"},
         {"ts": "asc"}
      ]
   },
   "name": "deviceID-readings",
   "type": "json",
   "partitioned": true
}'
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.IndexDefinition;
import com.ibm.cloud.cloudant.v1.model.IndexField;
import com.ibm.cloud.cloudant.v1.model.IndexResult;
import com.ibm.cloud.cloudant.v1.model.PostIndexOptions;

import java.util.Arrays;

Cloudant service = Cloudant.newInstance();

IndexField indexField1 = new IndexField.Builder()
    .add("deviceID", "asc")
    .build();

IndexField indexField2 = new IndexField.Builder()
    .add("ts", "asc")
    .build();

IndexDefinition index = new IndexDefinition.Builder()
    .fields(Arrays.asList(indexField1, indexField2))
    .build();

PostIndexOptions indexOptions = new PostIndexOptions.Builder()
    .db("readings")
    .index(index)
    .name("deviceID-readings")
    .type("json")
    .partitioned(true)
    .build();

IndexResult response =
    service.postIndex(indexOptions).execute()
        .getResult();

System.out.println(response);
const { CloudantV1 } = require('@ibm-cloud/cloudant');

const service = CloudantV1.newInstance({});

const indexField1 = {
  deviceID: 'asc'
}

const indexField2 = {
  ts: 'asc'
}

const index = {
  fields: [indexField1, indexField2]
}

service.postIndex({
  db: 'readings',
  name: 'deviceID-readings',
  index: index,
  type: 'json',
  partitioned: true
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1, IndexDefinition, IndexField

service = CloudantV1.new_instance()

index_field1 = IndexField(deviceID='asc')
index_field2 = IndexField(ts='asc')

index = IndexDefinition(
    fields=[index_field1, index_field2]
)

response = service.post_index(
    db='readings',
    name='deviceID-readings',
    index=index,
    type='json',
    partitioned=True
).get_result()

print(response)
var indexField1 cloudantv1.IndexField
var indexField2 cloudantv1.IndexField
indexField1.SetProperty("deviceID", core.StringPtr("asc"))
indexField2.SetProperty("ts", core.StringPtr("asc"))

postIndexOptions := service.NewPostIndexOptions(
  "readings",
  &cloudantv1.IndexDefinition{
    Fields: []cloudantv1.IndexField{
      indexField1,
      indexField2,
    },
  },
)
postIndexOptions.SetName("deviceID-readings")
postIndexOptions.SetType("json")
postIndexOptions.SetPartitioned(true)

indexResult, response, err := service.PostIndex(postIndexOptions)
if err != nil {
  panic(err)
}

b, _ := json.MarshalIndent(indexResult, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
   "github.com/IBM/go-sdk-core/v5/core"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.

Making queries

Overall, you want to make four queries:

  1. Readings for all time for a piece of infrastructure.
  2. Readings for today for a piece of infrastructure.
  3. Readings for all time for a specific device.
  4. Readings for today for a specific device.

Finding all readings for a piece of infrastructure

These partitions are infrastructure-based, so you can use _all_docs for a partition. For example, query all readings for the bridge-9876 infrastructure piece by using the following command.

curl -X POST "$SERVICE_URL/readings/_partition/bridge-9876/_all_docs" -H 'Content-Type: 
application/json' --data '{"include_docs": true}'
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.AllDocsResult;
import com.ibm.cloud.cloudant.v1.model.PostPartitionAllDocsOptions;
Cloudant service = Cloudant.newInstance();

PostPartitionAllDocsOptions allDocsOptions =
    new PostPartitionAllDocsOptions.Builder()
        .db("readings")
        .partitionKey("bridge-9876")
        .includeDocs(true)
        .build();

AllDocsResult response =
    service.postPartitionAllDocs(allDocsOptions).execute()
        .getResult();

System.out.println(response);
import { CloudantV1 } from '@ibm-cloud/cloudant';

const service = CloudantV1.newInstance({});

service.postPartitionAllDocs({
  db: 'readings',
  partitionKey: 'bridge-9876',
  includeDocs: true
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1

service = CloudantV1.new_instance()

response = service.post_partition_all_docs(
  db='readings',
  partition_key='bridge-9876',
  include_docs=True
).get_result()

print(response)
postPartitionAllDocsOptions := service.NewPostPartitionAllDocsOptions(
  "readings",
  "bridge-9876",
)
postPartitionAllDocsOptions.SetIncludeDocs(true)

allDocsResult, response, err := service.PostPartitionAllDocs(postPartitionAllDocsOptions)
if err != nil {
  panic(err)
}

b, _ := json.MarshalIndent(allDocsResult, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.

Finding recent readings for a piece of infrastructure

This query needs to use the partitioned timestamped-readings index. You can issue a query to the partition to get the readings for today, assuming today is 13 December 2018.

The partition is embedded in the HTTP path when you issue the request to IBM Cloudant:

curl -X POST "$SERVICE_URL/readings/_partition/bridge-9876/_find" -H 'Content-Type: 
application/json' --data '{
    "selector": {
        "ts": { "$gte": "20181213"}
    }
}'
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.FindResult;
import com.ibm.cloud.cloudant.v1.model.PostPartitionFindOptions;
import com.ibm.cloud.cloudant.v1.model.Selector;

import java.util.HashMap;
import java.util.Map;

Cloudant service = Cloudant.newInstance();

Map greaterThanOrEqualWithTs = new HashMap<>();
greaterThanOrEqualWithTs.put("$gte", "20181213");

Selector selector = new Selector();
selector.put("ts", greaterThanOrEqualWithTs);

PostPartitionFindOptions findOptions =
    new PostPartitionFindOptions.Builder()
        .db("readings")
        .partitionKey("bridge-9876")
        .selector(selector)
        .build();

FindResult response =
    service.postPartitionFind(findOptions).execute()
        .getResult();

System.out.println(response);
import { CloudantV1 } from '@ibm-cloud/cloudant';

const service = CloudantV1.newInstance({});

const selector: CloudantV1.Selector = {
  ts: {'$gte': '20181213'}
}
service.postPartitionFind({
  db: 'readings',
  partitionKey: 'bridge-9876',
  selector: selector
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1

service = CloudantV1.new_instance()

response = service.post_partition_find(
  db='readings',
  partition_key='bridge-9876',
  selector={'ts': {'$gte': '20181213'}}
).get_result()

print(response)
selector := map[string]interface{}{
  "ts": map[string]string{
    "$gte": "20181213",
  },
}

postPartitionFindOptions := service.NewPostPartitionFindOptions(
  "readings",
  "bridge-9876",
  selector,
)

findResult, response, err := service.PostPartitionFind(postPartitionFindOptions)
if err != nil {
  panic(err)
}

b, _ := json.MarshalIndent(findResult, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.

Finding the infrastructure ID for a device

The two queries we've yet to perform are shown in the following list:

  1. Readings for all time for a specific device.
  2. Readings for today for a specific device.

For these two queries, you need to find the partition for the devices by using the global by-device index. Then, you can query the individual partition for readings. While you might use a global index to query for the readings for individual devices, the mapping from device to infrastructure ID is highly cache-able. It never changes! With this approach, you can mostly use the cheaper and more efficient partitioned query for most requests.

Using a global index to query directly for device readings might be more efficient if caching the device to infrastructure mapping doesn't work well for a specific application.

To find the relevant partition for a device, you query the by-device view, sending the device ID as the key:

curl -X POST "$SERVICE_URL/readings/_design/infrastructure-mapping/_view/by-device' -H 'Content-Type: application/json' --data '{
"keys": ["device-123456"], "limit": 1 
}'

For more language examples that show querying a global view, see the Query a MapReduce view in API Docs.

The previous command returns the following response:

{
  "total_rows": 5,
  "offset": 0,
  "rows": [
    {
      "id": "bridge-9876:device-123456-20181211T11:13:24.123456Z",
      "key": "device-123456",
      "value": "bridge-9876"
    }
  ]
}

The partition key is in the value field of the included row: bridge-9876.

Querying for all results for a device

To get the results for a device, you issue a partition query for the device within the bridge-9876 partition. A standard IBM Cloudant Query selector is used, as if one were issuing a global query.

The partition is embedded in the HTTP path when you issue the request to IBM Cloudant:

curl -X POST "$SERVICE_URL/readings/_partition/bridge-9876/_find" -H 'Content-Type: 
application/json' --data '{
   "selector": {
      "deviceID": {
         "$eq": "device-123456"
      }
   }
}'
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.FindResult;
import com.ibm.cloud.cloudant.v1.model.PostPartitionFindOptions;
import com.ibm.cloud.cloudant.v1.model.Selector;

import java.util.HashMap;
import java.util.Map;

Cloudant service = Cloudant.newInstance();

Map equalWithDeviceID = new HashMap<>();
equalWithDeviceID.put("$eq", "device-123456");

Selector selector = new Selector();
selector.put("deviceID", equalWithDeviceID);

PostPartitionFindOptions findOptions =
    new PostPartitionFindOptions.Builder()
        .db("readings")
        .partitionKey("bridge-9876")
        .selector(selector)
        .build();

FindResult response =
    service.postPartitionFind(findOptions).execute()
        .getResult();

System.out.println(response);
import { CloudantV1 } from '@ibm-cloud/cloudant';

const service = CloudantV1.newInstance({});

const selector: CloudantV1.Selector = {
  deviceID: {'$eq': 'device-123456'}
}
service.postPartitionFind({
  db: 'readings',
  partitionKey: 'bridge-9876',
  selector: selector
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1

service = CloudantV1.new_instance()

response = service.post_partition_find(
  db='readings',
  partition_key='bridge-9876',
  selector={'deviceID': {'$eq': 'device-123456'}}
).get_result()

print(response)
selector := map[string]interface{}{
"deviceID": map[string]string{
"$eq": "device-123456",
},
}

postPartitionFindOptions := service.NewPostPartitionFindOptions(
"readings",
"bridge-9876",
selector,
)

findResult, response, err := service.PostPartitionFind(postPartitionFindOptions)
if err != nil {
panic(err)
}

b, _ := json.MarshalIndent(findResult, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.

Querying for recent results for a device

To get the results for a device, you issue a partition query for the device within the bridge-9876 partition. The selector is only slightly more complicated, but still the same as an equivalent global query.

Query for recent results assuming today is 13 December 2018

The partition is embedded in the HTTP path when issuing the request to IBM Cloudant:

curl -X POST "$SERVICE_URL/readings/_partition/bridge-9876/_find" -H 'Content-Type: application/json' --data '{
   "selector": {
      "deviceID": {
         "$eq": "device-123456"
      },
      "ts": {
         "$gte": "20181213"
      }
   }
}'
import com.ibm.cloud.cloudant.v1.Cloudant;
import com.ibm.cloud.cloudant.v1.model.FindResult;
import com.ibm.cloud.cloudant.v1.model.PostPartitionFindOptions;
import com.ibm.cloud.cloudant.v1.model.Selector;

import java.util.HashMap;
import java.util.Map;

Cloudant service = Cloudant.newInstance();

Map equalWithDeviceID = new HashMap<>();
equalWithDeviceID.put("$eq", "device-123456");

Map greaterThanOrEqualWithTs = new HashMap<>();
greaterThanOrEqualWithTs.put("$gte", "20181213");

Selector selector = new Selector();
selector.put("deviceID", equalWithDeviceID);
selector.put("ts", greaterThanOrEqualWithTs);

PostPartitionFindOptions findOptions =
    new PostPartitionFindOptions.Builder()
        .db("readings")
        .partitionKey("bridge-9876")
        .selector(selector)
        .build();

FindResult response =
    service.postPartitionFind(findOptions).execute()
        .getResult();

System.out.println(response);
import { CloudantV1 } from '@ibm-cloud/cloudant';

const service = CloudantV1.newInstance({});

const selector: CloudantV1.Selector = {
  deviceID: {'$eq': 'device-123456'},
  ts: {'$gte': '20181213'}
}
service.postPartitionFind({
  db: 'readings',
  partitionKey: 'bridge-9876',
  selector: selector
}).then(response => {
  console.log(response.result);
});
from ibmcloudant.cloudant_v1 import CloudantV1

service = CloudantV1.new_instance()

response = service.post_partition_find(
  db='readings',
  partition_key='bridge-9876',
  selector={
      'deviceID': {'$eq': 'device-123456'},
      'ts': {'$gte': '20181213'}
  }
).get_result()

print(response)
selector := map[string]interface{}{
"deviceID": map[string]string{
"$eq": "device-123456",
},
"ts": map[string]string{
"$gte": "20181213",
},
}

postPartitionFindOptions := service.NewPostPartitionFindOptions(
"readings",
"bridge-9876",
selector,
)

findResult, response, err := service.PostPartitionFind(postPartitionFindOptions)
if err != nil {
panic(err)
}

b, _ := json.MarshalIndent(findResult, "", "  ")
fmt.Println(string(b))

The previous Go example requires the following import block:

import (
   "encoding/json"
   "fmt"
   "github.com/IBM/cloudant-go-sdk/cloudantv1"
)

All Go examples require the service object to be initialized. For more information, see the API documentation's Authentication section for examples.