Skip to content

Ingest Streams

API v2 Coming Soon

API v2 introduces improvements to ingest operations. The current release uses API v1. See Migrating from API v1 for a preview of what's changing.

RBAC Requirement

Managing ingest streams requires specific roles when RBAC is enabled: creating, pausing, and deleting requires Architect or DataEngineer, while viewing requires Analyst, Architect, or DataEngineer. See OIDC and RBAC Setup for details.

Overview

An ingest stream connects a potentially infinite stream of incoming events to Quine and prepares the data for the streaming graph. Within the ingest stream, an ingest query, written in Cypher, updates the streaming graph nodes and edges as data is received.

Working with data in Quine is a two-step process:

  1. Load a stream of data into the graph with an ingest stream: event-driven data
  2. Monitor the graph for results and act with a standing query: data-driven events

All data sources require an "ingest stream". The ingest stream is the first opportunity we have to affect the data. In its most basic form, an ingest stream maps JSON to Quine nodes directly. The following query creates a new Quine node and applies all of the properties from the incoming JSON payload. It then adds an "Event" label to help with organization.

MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that

For guidance on designing graph structures and ingest queries, see Data Modeling and Query Design.

Hint

Select nodes, don't search for nodes. If your ingest query involves traversing nodes not by edges but e.g. a WHERE condition filtering on nodes matching a specific property value, the execution of this involves scanning all nodes. If we detect that the query provided may entail this behavior, we will log a warning:

Cypher query may contain full node scan; for improved performance, re-write without full node scan."

Quine adds an idFrom function to Cypher that takes any number of arguments and deterministically produces a node ID from that data.

This is similar to a consistent-hashing approach where a collection of values are hashed together to produce a unique result that can be used for an ID.

Quine supports many different kinds of IDs (numbers, UUIDs, strings, tuples of values, and more…), idFrom produces consistent results appropriate for the dataset regardless of the which ID type is used.

Quine parses JSON data into a graph structure according to the following assumptions:

  • Each JSON object is treated as a node in the graph.
  • Nested objects are treated as separate nodes. In this case, the JSON field name is treated as the name of the outgoing edge.
  • The field id is the default field defining the ID of the node. The name of this field is customizable and can be set with the string config setting at quine.json.id-field.
  • The ID computed for each object must be a string that can be parsed by the IdProviderType set in the configuration. IDs fields in JSON which do not have the proper type will result in an error returned from the API.
  • Objects without any ID will be assigned a random ID. Duplicate identical objects with no ID field may result in multiple separate nodes created—depending on the structure of the data provided.

Ingesting Event-Driven Data

Data enters Quine from streaming data sources like Kafka, Kinesis, or even POSIX named pipes. These data sources are effectively infinite — Quine works with them as if they will never end. Other types of data sources as supported as well, like ordinary files in CSV, JSON, or other formats.

Each ingest stream performs four primary operations:

  1. Consume a stream of bytes - e.g. Open local file on disk, or connect to a Kafka topic.
  2. Delimit into a sequence of finite byte arrays - e.g. Use newlines to separate individual lines from a file. Kafka provides delimiting records by its design.
  3. Parse byte array into an object - e.g. Parse as a string into JSON, or use a provided protobuf schema to deserialize each object. Bytes will first be decoded if the ingest has specified recordDecoders.
  4. Ingest query constructs the graph - e.g. Provide the parsed object as $that to a user-defined Cypher query which creates any graph structure desired.

When a new ingest stream is configured, Quine will connect to the source and follow the steps described above to use the incoming data stream to update the internal graph. Not all ingest configurations require distinct choices for each step. For instance, a file ingest can define its source as a CSV (Comma Separated Values) file, and the line-delimiting and field parsing are done automatically.

Each ingest stream is backpressured. When Quine is busy with intensive tasks downstream, or possibly waiting for the durable storage to finish processing, Quine will slow down the ingest stream so that it does not overwhelm other components. Backpressured ingest streams ensure maximum throughput while preserving stability of the overall system.

Data Sources

An ingest stream can receive data from the following data sources:

  • Files and Named Pipes
  • Kafka
  • Kinesis (including KinesisKCL)
  • SQS / SNS
  • Standard Input
  • S3 - Read files from Amazon S3 buckets
  • Server Sent Events - Consume SSE streams from HTTP endpoints
  • WebSocket - Connect to WebSocket servers for real-time data
  • Reactive Stream - Subscribe to reactive stream publishers

Need help?

See Troubleshooting Ingest for help with missing data, slow ingests, and other common issues.

Ingest Stream Structure

An ingest stream requires a name, source, and query. The source specifies where data comes from and how to parse it, while the query is a Cypher statement that processes each record into the graph.

Quine Enterprise supports multiple types of ingest sources. Each source type has specific configuration options described in the API documentation.

For example, creating an ingest stream via Create Ingest Stream: POST /api/v2/ingests to read data from standard input and store each line as a node:

{
    "name": "standardIn",
    "source": {
        "type": "StdInput",
        "format": "Line",
        "characterEncoding": "UTF-8"
    },
    "query": "MATCH (n) WHERE id(n) = idFrom($that) SET n.line = $that"
}

Quine Enterprise reads from standard input, passing each line into the Cypher query as the parameter $that. A unique node ID is generated using idFrom($that). Then, each line is stored as a line property associated with a new node in the streaming graph.

When creating an ingest stream via the API, you must provide a unique name that identifies the stream. For example, the above ingest stream is named standardIn to make it easier to reference in your application.

Alternatively, when creating an ingest stream via a recipe, Quine Enterprise automatically assigns a name to each stream using the format INGEST-# where the first ingest stream defined in the recipe is INGEST-1 and subsequent ingest streams are named in order with # counting up.

Here is the same ingest stream defined in a Recipe:

ingestStreams:
  - type: StdInput
    format: Line
    characterEncoding: UTF-8
    query: |-
        MATCH (n)
        WHERE id(n) = idFrom($that)
        SET n.line = $that

Record Decoding

An ingest may specify a list of decoders to support decompression. Decoders are applied in the order they are specified and are applied per-record.

  • Decoding is applied in the order specified in the recordDecoders array.
  • recordDecoders are supported for File, S3, Kafka, Kinesis, KinesisKCL, ServerSentEvents, and SQS sources.
  • Decoding types currently supported are Base64, Gzip, and Zlib.
  • The recordDecoders member is optional.

The following ingest stream specifies that each record is Gzipped, then Base64 encoded:

{
  "name": "kinesis-ingest",
  "source": {
    "type": "Kinesis",
    "format": "Json",
    "streamName": "my-stream",
    "recordDecoders": ["Base64", "Gzip"]
  },
  "query": "CREATE ($that)"
}

Error Handling

Quine Enterprise provides granular control over error handling at both the record and stream level.

Record-Level Errors

Configure onRecordError to control behavior when individual records fail:

{
  "onRecordError": {
    "retrySettings": {
      "minBackoff": 2000,
      "maxBackoff": 20,
      "randomFactor": 0.2,
      "maxRetries": 6
    },
    "logRecord": true,
    "deadLetterQueue": { ... }
  }
}
Setting Description Default
retrySettings Retry failed records with configurable backoff None
logRecord Log failed records to application logs true
deadLetterQueue Route failures to a dead letter queue None

Retry Settings

Setting Type Default Description
minBackoff int 2000 Minimum backoff between retries (ms)
maxBackoff int 20 Maximum backoff between retries (seconds)
randomFactor double 0.2 Jitter factor for backoff (0.0 - 1.0)
maxRetries int 6 Maximum number of retry attempts

Stream-Level Errors

Configure onStreamError to control behavior when the entire stream encounters errors:

{
  "onStreamError": {
    "type": "RetryStreamError",
    "maxRetries": 5
  }
}
Type Description
LogStreamError Log the error and stop the stream
RetryStreamError Retry stream connection with a retry limit

Dead Letter Queue

When records fail to process, route them to a dead letter queue for later analysis or reprocessing. Supported destinations include:

  • Kafka - publish to a Kafka topic
  • Kinesis - publish to a Kinesis stream
  • SNS - publish to an SNS topic
  • HTTP - POST to a webhook endpoint
  • File - write to a local JSON file
  • Stdout - write to standard output
  • ReactiveStream - broadcast to a reactive stream endpoint

Configure dead letter queue settings in your ingest stream definition:

{
  "name": "my-ingest",
  "source": {
    "type": "Kafka",
    "format": "Json",
    "topics": ["events"],
    "bootstrapServers": "kafka:9092"
  },
  "query": "CREATE ($that)",
  "onRecordError": {
    "deadLetterQueueSettings": {
      "destinations": [
        {
          "type": "Kafka",
          "topic": "failed-records",
          "bootstrapServers": "kafka:9092",
          "outputFormat": {
            "type": "JSON",
            "withInfoEnvelope": true
          }
        }
      ]
    }
  }
}

Output Formats

Format Configuration Description
JSON {"type": "JSON"} Raw JSON output
JSON+Info {"type": "JSON", "withInfoEnvelope": true} JSON with error details and original record
Protobuf {"type": "Protobuf", "schemaUrl": "...", "typeName": "..."} Binary Protobuf serialization

When withInfoEnvelope is true, failed records are wrapped with metadata:

{
  "error": "Cypher execution failed: property 'id' is required",
  "timestamp": "2024-01-15T10:30:00Z",
  "ingestName": "my-ingest",
  "originalRecord": { "name": "incomplete" }
}

Avro Format

Quine Enterprise supports native Apache Avro for efficient binary serialization.

{
  "name": "avro-ingest",
  "source": {
    "type": "Kafka",
    "format": {
      "type": "Avro",
      "schemaUrl": "http://schema-registry:8081/schemas/ids/1"
    },
    "topics": ["avro-events"],
    "bootstrapServers": "kafka:9092"
  },
  "query": "CREATE ($that)"
}

The schemaUrl can point to:

  • A Schema Registry URL
  • An HTTP/HTTPS URL serving the Avro schema JSON
  • A local file path containing the schema

KinesisKCL

The Kinesis Client Library (KCL) 3.x integration provides enterprise-grade Kinesis consumption with:

  • DynamoDB checkpointing - automatic progress tracking across restarts
  • Lease management - graceful handoff between consumers
  • Enhanced fan-out - dedicated throughput per consumer
  • CloudWatch metrics - built-in monitoring

KCL Configuration

{
  "name": "kcl-ingest",
  "source": {
    "type": "KinesisKCL",
    "kinesisStreamName": "my-stream",
    "applicationName": "my-app",
    "format": "Json",
    "credentials": {
      "region": "us-west-2",
      "accessKeyId": "AKIA...",
      "secretAccessKey": "..."
    },
    "initialPosition": "Latest",
    "numRetries": 3,
    "checkpointSettings": {
      "disableCheckpointing": false,
      "maxBatchSize": 1000,
      "maxBatchWaitMillis": 10000
    },
    "schedulerSourceSettings": {
      "bufferSize": 1000,
      "backpressureTimeoutMillis": 60000
    }
  },
  "query": "CREATE ($that)"
}
Setting Type Default Description
kinesisStreamName string Required Kinesis stream name
applicationName string Required Used for DynamoDB table and CloudWatch namespace
initialPosition string Latest Starting position: Latest, TrimHorizon, or AtTimestamp
numRetries int 3 Number of retries for recoverable errors
credentials object None AWS credentials (uses default provider if omitted)

Initial Position Options

Position Description
Latest Start from records added after subscription begins
TrimHorizon Start from the oldest available record in the shard
AtTimestamp Start from a specific timestamp (requires additional config)

Checkpoint Settings

Setting Type Default Description
disableCheckpointing boolean false Disable DynamoDB checkpointing
maxBatchSize int None Maximum records per checkpoint batch
maxBatchWaitMillis long None Maximum time between checkpoints (ms)

Scheduler Source Settings

Setting Type Default Description
bufferSize int None Internal buffer size (must be > 0)
backpressureTimeoutMillis long None Timeout when backpressure is applied (ms)

Retrieval Modes

KCL supports two retrieval modes:

Polling (default) - Standard GetRecords API:

{
  "retrieval": {
    "type": "Polling",
    "maxRecords": 10000,
    "idleTimeBetweenReadsInMillis": 1000
  }
}

FanOut - Enhanced fan-out with dedicated throughput:

{
  "retrieval": {
    "type": "FanOut",
    "consumerArn": "arn:aws:kinesis:us-west-2:123456789:stream/my-stream/consumer/my-consumer:1234567890"
  }
}

Or create a new consumer automatically:

{
  "retrieval": {
    "type": "FanOut",
    "consumerName": "my-new-consumer"
  }
}

KCL vs Basic Kinesis

Feature Kinesis KinesisKCL
Checkpointing Manual Automatic (DynamoDB)
Multiple consumers Coordinate manually Automatic lease management
Throughput Shared Dedicated (Enhanced Fan-Out)
Shard splits/merges Manual handling Automatic
Metrics Application logs CloudWatch integration

Use KinesisKCL source type for production workloads requiring high availability and reliable processing.

JavaScript Transformation

Pre-process records with JavaScript before executing your Cypher query. This enables data normalization, filtering, and enrichment without modifying the source.

{
  "name": "transformed-ingest",
  "source": {
    "type": "Kafka",
    "format": "Json",
    "topics": ["raw-events"],
    "bootstrapServers": "kafka:9092"
  },
  "query": "CREATE (n:Event $that)",
  "transformation": {
    "type": "JavaScript",
    "function": "function transform(record) { record.processedAt = Date.now(); return record; }"
  }
}

Transformation Use Cases

  • Field normalization - standardize field names across sources
  • Data enrichment - add computed fields
  • Filtering - return null to skip records
  • Format conversion - reshape nested structures

The JavaScript function receives the parsed record and must return the transformed record (or null to skip).

WebSocket File Upload

Stream files directly to Quine Enterprise via WebSocket with real-time progress feedback.

Configuration

Create a WebSocket file upload ingest that accepts file data:

{
  "name": "ws-upload",
  "source": {
    "type": "WebSocketFileUpload",
    "format": "Json"
  },
  "query": "CREATE ($that)"
}

Supported file formats:

Format Description
Line Line-delimited text
JsonL Newline-delimited JSON (JSON Lines)
Json Single JSON array or object
Csv Comma-separated values with headers

Progress Feedback

The server sends JSON messages during upload to provide feedback:

Message Type Description
Ack Acknowledges receipt of data chunk
Progress Reports number of records processed
Error Reports parsing or processing errors
{ "type": "Ack" }
{ "type": "Progress", "count": 100 }
{ "type": "Error", "message": "Parse error at line 50", "index": 50, "record": "..." }

The server buffers up to 8 WebSocket messages for backpressure handling.

Inspecting Ingest Streams via the API

Quine Enterprise exposes API endpoints to monitor and manage ingest streams while in operation.

Operation Endpoint
List all ingest streams List Ingest Streams: GET /api/v2/ingests
Create ingest stream Create Ingest Stream: POST /api/v2/ingests
Get ingest stream status Ingest Stream Status: GET /api/v2/ingests/{name}
Pause ingest stream Pause Ingest Stream: POST /api/v2/ingests/{name}/pause
Resume ingest stream Unpause Ingest Stream: POST /api/v2/ingests/{name}/start
Delete ingest stream Delete Ingest Stream: DELETE /api/v2/ingests/{name}

For complete API documentation, see the REST API Reference.