AWS Kinesis Data Stream Support
Reading Records from Kinesis
thatDot Streaming Graph has full support for reading records from Kinesis Data Streams. The means by which thatDot Streaming Graph interprets a record into some graph-structured data is highly configurable via the REST API.
Example
In this example, we will register a multiple-shard Kinesis stream of JSON objects (one JSON object per Kinesis record) as a data source, creating a single node in the graph for each object.
Preparation
For the purposes of this tutorial, you will need a Kinesis data stream and credentials (an access key ID and secret access key) for an IAM User with the following privileges:
- kinesis:RegisterStreamConsumer
- kinesis:DeregisterStreamConsumer
- kinesis:SubscribeToShard
- kinesis:DescribeStreamSummary
- kinesis:DescribeStreamConsumer
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:ListTagsForStream
For our example, we’ll assume we have such a user with access to the json-logs
stream with access key ID AKIAMYACCESSKEY
and secret AWSScRtACCessKeyAWS/ScRtACCessKey
. These will be used to register the data source with thatDot Streaming Graph.
Registering Kinesis as a data source
To register Kinesis as a data source to thatDot Streaming Graph, we need to describe our stream via the ingest REST API.
For example, we’ll use the aforementioned Kinesis stream hosted in the region us-west-2
, named json-logs
and we’ll give the thatDot Streaming Graph ingest stream the name kinesis-logs
. Thus, we make our API request a POST to /api/v1/ingest/kinesis-logs
with the following payload:
{
"format": {
"query": "CREATE ($that)",
"type": "CypherJson"
},
"streamName": "json-logs",
"parallelism": 2,
"shardIds": [],
"type": "KinesisIngest",
"credentials": {
"region": "us-west-2",
"accessKeyId": "AKIAMYACCESSKEY",
"secretAccessKey": "AWSScRtACCessKeyAWS/ScRtACCessKey"
},
"iteratorType": "TrimHorizon"
}
We pass in an empty list of shard IDs to specify that thatDot Streaming Graph should read from all shards in the stream. If we wanted to only read from particular shards, we would instead list out the shard IDs from which thatDot Streaming Graph should read.
Because the Kinesis stream is filled with JSON records, we choose the CypherJson
import format, which reads each record as a JSON object before passing it as a Map
to a Cypher query.
The Cypher query can access this object using the name that
. Thus, our configured query CREATE ($that)
will create a node for each JSON record with the same property structure as the JSON record.
In this example, we use a Kinesis stream populated with JSON objects as records, though thatDot Streaming Graph offers other options for how to interpret records from a stream. These options are configurable via the same endpoint by using different format
s in the above JSON payload.
Finally, we choose to read all records from the Kinesis stream, including records already present in the stream configuring the thatDot Streaming Graph data source. To get this behavior, we use a TrimHorizon
Kinesis iterator type. If we wished to only read records written to the Kinesis stream after setting up the thatDot Streaming Graph data source, we would have used the Latest
iterator type.