AWS Kinesis Data Stream Support

Reading Records from Kinesis

Quine Enterprise has full support for reading records from Kinesis Data Streams. The means by which Quine Enterprise interprets a record into some graph-structured data is highly configurable via the REST API.

Example

In this example, we will register a multiple-shard Kinesis stream of JSON objects (one JSON object per Kinesis record) as a data source, creating a single node in the graph for each object.

Preparation

For the purposes of this tutorial, you will need a Kinesis data stream and credentials (an access key ID and secret access key) for an IAM User with the following privileges:

  • kinesis:RegisterStreamConsumer
  • kinesis:DeregisterStreamConsumer
  • kinesis:SubscribeToShard
  • kinesis:DescribeStreamSummary
  • kinesis:DescribeStreamConsumer
  • kinesis:GetShardIterator
  • kinesis:GetRecords
  • kinesis:DescribeStream
  • kinesis:ListTagsForStream

For our example, we’ll assume we have such a user with access to the json-logs stream with access key ID AKIAMYACCESSKEY and secret AWSScRtACCessKeyAWS/ScRtACCessKey. These will be used to register the data source with Quine Enterprise.

Registering Kinesis as a data source

To register Kinesis as a data source to Quine Enterprise, we need to describe our stream via the ingest REST API.

For example, we’ll use the aforementioned Kinesis stream hosted in the region us-west-2, named json-logs and we’ll give the Quine Enterprise ingest stream the name kinesis-logs. Thus, we make our API request a POST to /api/v1/ingest/kinesis-logs with the following payload:

{
  "format": {
    "query": "CREATE ($that)",
    "type": "CypherJson"
  },
  "streamName": "json-logs",
  "parallelism": 2,
  "shardIds": [],
  "type": "KinesisIngest",
  "credentials": {
    "region": "us-west-2",
    "accessKeyId": "AKIAMYACCESSKEY",
    "secretAccessKey": "AWSScRtACCessKeyAWS/ScRtACCessKey"
  },
  "iteratorType": "TrimHorizon"
}

We pass in an empty list of shard IDs to specify that Quine Enterprise should read from all shards in the stream. If we wanted to only read from particular shards, we would instead list out the shard IDs from which Quine Enterprise should read.

Because the Kinesis stream is filled with JSON records, we choose the CypherJson import format, which reads each record as a JSON object before passing it as a Map to a Cypher query.

The Cypher query can access this object using the name that. Thus, our configured query CREATE ($that) will create a node for each JSON record with the same property structure as the JSON record.

In this example, we use a Kinesis stream populated with JSON objects as records, though Quine Enterprise offers other options for how to interpret records from a stream. These options are configurable via the same endpoint by using different formats in the above JSON payload.

Finally, we choose to read all records from the Kinesis stream, including records already present in the stream configuring the Quine Enterprise data source. To get this behavior, we use a TrimHorizon Kinesis iterator type. If we wished to only read records written to the Kinesis stream after setting up the Quine Enterprise data source, we would have used the Latest iterator type.