Ingest Streams¶
API v2 Coming Soon
API v2 introduces improvements to ingest operations. The current release uses API v1. See Migrating from API v1 for a preview of what's changing.
RBAC Requirement
Managing ingest streams requires specific roles when RBAC is enabled: creating, pausing, and deleting requires Architect or DataEngineer, while viewing requires Analyst, Architect, or DataEngineer. See OIDC and RBAC Setup for details.
Overview¶
An ingest stream connects a potentially infinite stream of incoming events to Quine and prepares the data for the streaming graph. Within the ingest stream, an ingest query, written in Cypher, updates the streaming graph nodes and edges as data is received.
Working with data in Quine is a two-step process:
- Load a stream of data into the graph with an ingest stream: event-driven data
- Monitor the graph for results and act with a standing query: data-driven events
All data sources require an "ingest stream". The ingest stream is the first opportunity we have to affect the data. In its most basic form, an ingest stream maps JSON to Quine nodes directly. The following query creates a new Quine node and applies all of the properties from the incoming JSON payload. It then adds an "Event" label to help with organization.
MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that
For guidance on designing graph structures and ingest queries, see Data Modeling and Query Design.
Hint
Select nodes, don't search for nodes. If your ingest query involves traversing nodes not by edges but e.g. a WHERE condition filtering on nodes matching a specific property value, the execution of this involves scanning all nodes. If we detect that the query provided may entail this behavior, we will log a warning:
Cypher query may contain full node scan; for improved performance, re-write without full node scan."
Quine adds an idFrom function to Cypher that takes any number of arguments and deterministically produces a node ID from that data.
This is similar to a consistent-hashing approach where a collection of values are hashed together to produce a unique result that can be used for an ID.
Quine supports many different kinds of IDs (numbers, UUIDs, strings, tuples of values, and more…), idFrom produces consistent results appropriate for the dataset regardless of the which ID type is used.
Quine parses JSON data into a graph structure according to the following assumptions:
- Each JSON object is treated as a node in the graph.
- Nested objects are treated as separate nodes. In this case, the JSON field name is treated as the name of the outgoing edge.
- The field id is the default field defining the ID of the node. The name of this field is customizable and can be set with the string config setting at quine.json.id-field.
- The ID computed for each object must be a string that can be parsed by the IdProviderType set in the configuration. IDs fields in JSON which do not have the proper type will result in an error returned from the API.
- Objects without any ID will be assigned a random ID. Duplicate identical objects with no ID field may result in multiple separate nodes created—depending on the structure of the data provided.
Ingesting Event-Driven Data¶
Data enters Quine from streaming data sources like Kafka, Kinesis, or even POSIX named pipes. These data sources are effectively infinite — Quine works with them as if they will never end. Other types of data sources as supported as well, like ordinary files in CSV, JSON, or other formats.
Each ingest stream performs four primary operations:
- Consume a stream of bytes - e.g. Open local file on disk, or connect to a Kafka topic.
- Delimit into a sequence of finite byte arrays - e.g. Use newlines to separate individual lines from a file. Kafka provides delimiting records by its design.
- Parse byte array into an object - e.g. Parse as a string into JSON, or use a provided protobuf schema to deserialize each object. Bytes will first be decoded if the ingest has specified recordDecoders.
- Ingest query constructs the graph - e.g. Provide the parsed object as
$thatto a user-defined Cypher query which creates any graph structure desired.
When a new ingest stream is configured, Quine will connect to the source and follow the steps described above to use the incoming data stream to update the internal graph. Not all ingest configurations require distinct choices for each step. For instance, a file ingest can define its source as a CSV (Comma Separated Values) file, and the line-delimiting and field parsing are done automatically.
Each ingest stream is backpressured. When Quine is busy with intensive tasks downstream, or possibly waiting for the durable storage to finish processing, Quine will slow down the ingest stream so that it does not overwhelm other components. Backpressured ingest streams ensure maximum throughput while preserving stability of the overall system.
Data Sources¶
An ingest stream can receive data from the following data sources:
- Files and Named Pipes
- Kafka
- Kinesis (including KinesisKCL)
- SQS / SNS
- Standard Input
- S3 - Read files from Amazon S3 buckets
- Server Sent Events - Consume SSE streams from HTTP endpoints
- WebSocket - Connect to WebSocket servers for real-time data
- Reactive Stream - Subscribe to reactive stream publishers
Need help?
See Troubleshooting Ingest for help with missing data, slow ingests, and other common issues.
Ingest Stream Structure¶
An ingest stream requires a name, source, and query. The source specifies where data comes from and how to parse it, while the query is a Cypher statement that processes each record into the graph.
Quine Enterprise supports multiple types of ingest sources. Each source type has specific configuration options described in the API documentation.
For example, creating an ingest stream via Create Ingest Stream: POST /api/v2/ingests to read data from standard input and store each line as a node:
{
"name": "standardIn",
"source": {
"type": "StdInput",
"format": "Line",
"characterEncoding": "UTF-8"
},
"query": "MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that"
}
Quine Enterprise reads from standard input, passing each line into the Cypher query as the parameter $that. A unique node ID is generated using idFrom($that). Then, each line is stored as a line property associated with a new node in the streaming graph.
When creating an ingest stream via the API, you must provide a unique name that identifies the stream. For example, the above ingest stream is named standardIn to make it easier to reference in your application.
Alternatively, when creating an ingest stream via a recipe, Quine Enterprise automatically assigns a name to each stream using the format INGEST-# where the first ingest stream defined in the recipe is INGEST-1 and subsequent ingest streams are named in order with # counting up.
Here is the same ingest stream defined in a Recipe:
ingestStreams:
- type: StdInput
format: Line
characterEncoding: UTF-8
query: |-
MATCH (n)
WHERE id(n) = idFrom($that)
SET n.line = $that
Record Decoding¶
An ingest may specify a list of decoders to support decompression. Decoders are applied in the order they are specified and are applied per-record.
- Decoding is applied in the order specified in the recordDecoders array.
- recordDecoders are supported for File, S3, Kafka, Kinesis, KinesisKCL, ServerSentEvents, and SQS sources.
- Decoding types currently supported are Base64, Gzip, and Zlib.
- The recordDecoders member is optional.
The following ingest stream specifies that each record is Gzipped, then Base64 encoded:
{
"name": "kinesis-ingest",
"source": {
"type": "Kinesis",
"format": "Json",
"streamName": "my-stream",
"recordDecoders": ["Base64", "Gzip"]
},
"query": "CREATE ($that)"
}
Error Handling¶
Quine Enterprise provides granular control over error handling at both the record and stream level.
Record-Level Errors¶
Configure onRecordError to control behavior when individual records fail:
{
"onRecordError": {
"retrySettings": {
"minBackoff": 2000,
"maxBackoff": 20,
"randomFactor": 0.2,
"maxRetries": 6
},
"logRecord": true,
"deadLetterQueue": { ... }
}
}
| Setting | Description | Default |
|---|---|---|
retrySettings |
Retry failed records with configurable backoff | None |
logRecord |
Log failed records to application logs | true |
deadLetterQueue |
Route failures to a dead letter queue | None |
Retry Settings¶
| Setting | Type | Default | Description |
|---|---|---|---|
minBackoff |
int | 2000 | Minimum backoff between retries (ms) |
maxBackoff |
int | 20 | Maximum backoff between retries (seconds) |
randomFactor |
double | 0.2 | Jitter factor for backoff (0.0 - 1.0) |
maxRetries |
int | 6 | Maximum number of retry attempts |
Stream-Level Errors¶
Configure onStreamError to control behavior when the entire stream encounters errors:
{
"onStreamError": {
"type": "RetryStreamError",
"maxRetries": 5
}
}
| Type | Description |
|---|---|
LogStreamError |
Log the error and stop the stream |
RetryStreamError |
Retry stream connection with a retry limit |
Dead Letter Queue¶
When records fail to process, route them to a dead letter queue for later analysis or reprocessing. Supported destinations include:
- Kafka - publish to a Kafka topic
- Kinesis - publish to a Kinesis stream
- SNS - publish to an SNS topic
- HTTP - POST to a webhook endpoint
- File - write to a local JSON file
- Stdout - write to standard output
- ReactiveStream - broadcast to a reactive stream endpoint
Configure dead letter queue settings in your ingest stream definition:
{
"name": "my-ingest",
"source": {
"type": "Kafka",
"format": "Json",
"topics": ["events"],
"bootstrapServers": "kafka:9092"
},
"query": "CREATE ($that)",
"onRecordError": {
"deadLetterQueueSettings": {
"destinations": [
{
"type": "Kafka",
"topic": "failed-records",
"bootstrapServers": "kafka:9092",
"outputFormat": {
"type": "JSON",
"withInfoEnvelope": true
}
}
]
}
}
}
Output Formats¶
| Format | Configuration | Description |
|---|---|---|
| JSON | {"type": "JSON"} |
Raw JSON output |
| JSON+Info | {"type": "JSON", "withInfoEnvelope": true} |
JSON with error details and original record |
| Protobuf | {"type": "Protobuf", "schemaUrl": "...", "typeName": "..."} |
Binary Protobuf serialization |
When withInfoEnvelope is true, failed records are wrapped with metadata:
{
"error": "Cypher execution failed: property 'id' is required",
"timestamp": "2024-01-15T10:30:00Z",
"ingestName": "my-ingest",
"originalRecord": { "name": "incomplete" }
}
Avro Format¶
Quine Enterprise supports native Apache Avro for efficient binary serialization.
{
"name": "avro-ingest",
"source": {
"type": "Kafka",
"format": {
"type": "Avro",
"schemaUrl": "http://schema-registry:8081/schemas/ids/1"
},
"topics": ["avro-events"],
"bootstrapServers": "kafka:9092"
},
"query": "CREATE ($that)"
}
The schemaUrl can point to:
- A Schema Registry URL
- An HTTP/HTTPS URL serving the Avro schema JSON
- A local file path containing the schema
KinesisKCL¶
The Kinesis Client Library (KCL) 3.x integration provides enterprise-grade Kinesis consumption with:
- DynamoDB checkpointing - automatic progress tracking across restarts
- Lease management - graceful handoff between consumers
- Enhanced fan-out - dedicated throughput per consumer
- CloudWatch metrics - built-in monitoring
KCL Configuration¶
{
"name": "kcl-ingest",
"source": {
"type": "KinesisKCL",
"kinesisStreamName": "my-stream",
"applicationName": "my-app",
"format": "Json",
"credentials": {
"region": "us-west-2",
"accessKeyId": "AKIA...",
"secretAccessKey": "..."
},
"initialPosition": "Latest",
"numRetries": 3,
"checkpointSettings": {
"disableCheckpointing": false,
"maxBatchSize": 1000,
"maxBatchWaitMillis": 10000
},
"schedulerSourceSettings": {
"bufferSize": 1000,
"backpressureTimeoutMillis": 60000
}
},
"query": "CREATE ($that)"
}
| Setting | Type | Default | Description |
|---|---|---|---|
kinesisStreamName |
string | Required | Kinesis stream name |
applicationName |
string | Required | Used for DynamoDB table and CloudWatch namespace |
initialPosition |
string | Latest |
Starting position: Latest, TrimHorizon, or AtTimestamp |
numRetries |
int | 3 | Number of retries for recoverable errors |
credentials |
object | None | AWS credentials (uses default provider if omitted) |
Initial Position Options¶
| Position | Description |
|---|---|
Latest |
Start from records added after subscription begins |
TrimHorizon |
Start from the oldest available record in the shard |
AtTimestamp |
Start from a specific timestamp (requires additional config) |
Checkpoint Settings¶
| Setting | Type | Default | Description |
|---|---|---|---|
disableCheckpointing |
boolean | false |
Disable DynamoDB checkpointing |
maxBatchSize |
int | None | Maximum records per checkpoint batch |
maxBatchWaitMillis |
long | None | Maximum time between checkpoints (ms) |
Scheduler Source Settings¶
| Setting | Type | Default | Description |
|---|---|---|---|
bufferSize |
int | None | Internal buffer size (must be > 0) |
backpressureTimeoutMillis |
long | None | Timeout when backpressure is applied (ms) |
Retrieval Modes¶
KCL supports two retrieval modes:
Polling (default) - Standard GetRecords API:
{
"retrieval": {
"type": "Polling",
"maxRecords": 10000,
"idleTimeBetweenReadsInMillis": 1000
}
}
FanOut - Enhanced fan-out with dedicated throughput:
{
"retrieval": {
"type": "FanOut",
"consumerArn": "arn:aws:kinesis:us-west-2:123456789:stream/my-stream/consumer/my-consumer:1234567890"
}
}
Or create a new consumer automatically:
{
"retrieval": {
"type": "FanOut",
"consumerName": "my-new-consumer"
}
}
KCL vs Basic Kinesis¶
| Feature | Kinesis | KinesisKCL |
|---|---|---|
| Checkpointing | Manual | Automatic (DynamoDB) |
| Multiple consumers | Coordinate manually | Automatic lease management |
| Throughput | Shared | Dedicated (Enhanced Fan-Out) |
| Shard splits/merges | Manual handling | Automatic |
| Metrics | Application logs | CloudWatch integration |
Use KinesisKCL source type for production workloads requiring high availability and reliable processing.
JavaScript Transformation¶
Pre-process records with JavaScript before executing your Cypher query. This enables data normalization, filtering, and enrichment without modifying the source.
{
"name": "transformed-ingest",
"source": {
"type": "Kafka",
"format": "Json",
"topics": ["raw-events"],
"bootstrapServers": "kafka:9092"
},
"query": "CREATE (n:Event $that)",
"transformation": {
"type": "JavaScript",
"function": "function transform(record) { record.processedAt = Date.now(); return record; }"
}
}
Transformation Use Cases¶
- Field normalization - standardize field names across sources
- Data enrichment - add computed fields
- Filtering - return
nullto skip records - Format conversion - reshape nested structures
The JavaScript function receives the parsed record and must return the transformed record (or null to skip).
WebSocket File Upload¶
Stream files directly to Quine Enterprise via WebSocket with real-time progress feedback.
Configuration¶
Create a WebSocket file upload ingest that accepts file data:
{
"name": "ws-upload",
"source": {
"type": "WebSocketFileUpload",
"format": "Json"
},
"query": "CREATE ($that)"
}
Supported file formats:
| Format | Description |
|---|---|
Line |
Line-delimited text |
JsonL |
Newline-delimited JSON (JSON Lines) |
Json |
Single JSON array or object |
Csv |
Comma-separated values with headers |
Progress Feedback¶
The server sends JSON messages during upload to provide feedback:
| Message Type | Description |
|---|---|
Ack |
Acknowledges receipt of data chunk |
Progress |
Reports number of records processed |
Error |
Reports parsing or processing errors |
{ "type": "Ack" }
{ "type": "Progress", "count": 100 }
{ "type": "Error", "message": "Parse error at line 50", "index": 50, "record": "..." }
The server buffers up to 8 WebSocket messages for backpressure handling.
Inspecting Ingest Streams via the API¶
Quine Enterprise exposes API endpoints to monitor and manage ingest streams while in operation.
| Operation | Endpoint |
|---|---|
| List all ingest streams | List Ingest Streams: GET /api/v2/ingests |
| Create ingest stream | Create Ingest Stream: POST /api/v2/ingests |
| Get ingest stream status | Ingest Stream Status: GET /api/v2/ingests/{name} |
| Pause ingest stream | Pause Ingest Stream: POST /api/v2/ingests/{name}/pause |
| Resume ingest stream | Unpause Ingest Stream: POST /api/v2/ingests/{name}/start |
| Delete ingest stream | Delete Ingest Stream: DELETE /api/v2/ingests/{name} |
For complete API documentation, see the REST API Reference.