Configuration

Configuration is supported by Typesafe Config, enabling multiple ways to pass in options. Most commonly, configuration is provided via either Java system properties (passed as command-line options) or via a HOCON config file. HOCON is a JSON-like format that is very flexible and human-readable. The reference config below is in HOCON format.


# Example of setting configuration via configuration file java \ -Dconfig.file=quine.conf \ -jar streaming-graph.jar # Example of overriding configuration via system properties java \ -Dquine.webserver.port=9000 \ -Dquine.id.type=uuid-3 \ -jar streaming-graph.jar # Example of overriding configuration via environment variables CONFIG_FORCE_quine_webserver_port=9000 \ CONFIG_FORCE_quine_id_type=uuid-5 \ java \ -Dconfig.override_with_env_vars=true \ -jar streaming-graph.jar

Differences from Quine

In general, the configuration for thatDot Streaming Graph is very similar to the configuration for Quine Open Source. Streaming Graph’s complete documented configuration reference is located here: Config Ref Manual. Streaming Graph exposes a configuration surface that is nearly a superset of Quine’s.

Streaming Graph also includes some additional configuration sections that are not present in Quine:

  • cluster: This section is used to configure the cluster membership and communication settings. Its full breadth is documented in the config reference, but special attention should be given to the cluster joining configuration.

The quine.cluster.cluster-join configuration block specifies how the cluster members will be discovered. The type is either dns-entry or static-seed-addresses. If using dns-entry, a name should also be provided that will resolve a set of A records, one for each cluster member. If using static-seed-addresses, a seed-addresses list should be provided.

At minimum, a typical Kubernetes-based deployment will use dns-entry cluster joining with the following configuration:

quine.cluster {
  target-size = 4 # the number of members who must occupy positions in the cluster for the graph to be considered available
  cluster-join {
    type = "dns-entry" # Use the A records for a DNS name to find cluster members
    name = "streaminggraph" # The DNS name to use
  }
}

A typical configuration for an EC2-based deployment using static seed addresses would look like this:

quine.cluster {
  target-size = 4 # the number of members who must occupy positions in the cluster for the graph to be considered available
  cluster-join {
    type = "static-seed-addresses" # Use the specified, statically-known seed addresses to find cluster members
    seed-addresses = [ # note that not all cluster members need to be specified as seeds
      {
        address = "10.0.5.1" # hostname or IP at which a seed member can be found
        port = 17750
      },
      {
        address = "10.0.1.12"
        port = 17750
      }
    ]
  }
}

In lieu of a full cluster-join config block, environment variables may be used: QUINE_SEED_DNS can be set instead of cluster-join.name when using dns-entry cluster joining, and QUINE_SEED_ADDRESSES can be set instead of cluster-join.seed-addresses when using static seed addresses (formatted as a comma-separated list of address:port pairs like 10.0.5.1:17750,10.0.1.12:17750).

Memory Configuration

thatDot Streaming Graph caches nodes in memory as necessary. The setting for in-memory-soft-node-limit and in-memory-hard-node-limit along with the shard-count configuration determines how many nodes can be cached at a time, and therefore how much heap space the JVM needs available for processing. The shard count defaults to 4 and does not typically need to be changed. Each shard in a cluster member retains its own cache of in memory nodes. The soft limit (defaulting to 10000) determines the minimum size of this cache, and the hard limit (defaulting to 75000) determines the maximum size of the cache. The cache uses the flexible range between these values for nodes which are being put to sleep while other nodes are being rehydrated into memory (i.e. the difference between these values determines how many nodes can be going to sleep and waking up at the same time). The default settings would enable between 40,000 and 300,000 nodes in memory at a time. Depending on the use case, and the average memory footprint of a node, These values can be adjusted up or down to maximize usage of the memory on the machine and the JVM heap.

Note

The heap space requirement is primarily a function of the soft and hard node limit settings. Beyond that, leaving overhead for the operating system to manage direct memory requests will ensure the performance of the host server.

To avoid Garbage Collection (GC) pauses in the JVM heap, it is recommended that you set the memory allocated to Streaming Graph to a fixed size if possible. You can do this by setting Xms and Xmx to the same value, discouraging the JVM from dynamically resizing the heap. Resizing the heap triggers a full GC, which can mean a lengthy pause (meaning everything in the app is absolutely locked), depending on the size of the heap. Keep in mind that large heap spaces take longer in garbage collection so simply adding as much as possible could negatively affect performance. For this reason, it is not recommended to set the heap size larger than 16GB, and 12GB tends to be a good starting point for large graph implementations with high ingest requirements.

Network traffic can also use memory. This is typically a small amount and is well-managed by the system, but in some extreme cases (like high-volume data without proper data locality), this memory use can add up to something non-trivial—especially if the heap is using all available memory on the system or other processes are running and competing for memory. Off-heap memory is typically used and in the worst case, not leaving some memory available for off-heap usage can lead to the OS killing the process. It is recommended that the physical memory on a host be 25% to 33% larger than the Xms and Xmx heap setting. This will guarantee there is enough off-heap memory available for these other operations.

Data Pipeline Security Recommendations

Data pipelines in thatDot Streaming Graph can be very flexible to almost any use-case, but with that flexibility comes potential for misuse. In order to keep Streaming Graph, its data, and its environment as secure as possible, we recommend you follow these best practices:

  1. Use the latest version of Streaming Graph. We regularly release updates with security fixes.
  2. Deploy Streaming Graph behind a reverse proxy that performs TLS termination, user authentication, and HTTP request logging. This will allow you to control access to your Streaming Graph instance and gives you more information about how and when your graph is being accessed.
  3. Configure your data source according to that data source’s best practices. For example, if using a Kafka data source, keep your Kafka servers up to date and use the kafkaProperties field in the Streaming Graph ingest configuration to enforce TLSv1.3 encryption between Streaming Graph and your Kafka cluster.
  4. Do not pass unsanitized data to any Cypher procedures (functionality invoked with the CALL syntax). This can lead to query injection attacks (via procedures like cypher.doIt and cypher.do.case) or server-side request forgery (via procedures like loadJsonLines).

Reference Documentation

Uncommented values are the defaults, unless otherwise noted. Unexpected configuration keys or values in the quine block will report an error at startup.

An underscore _ is used to indicate a required property with no default value. There are none of these in the default configuration.

quine {

  # webserver binding configuration
  webserver {
    # whether the webserver should be enabled
    enabled = true

    # Hostname or address of the interface to which the HTTP server should
    # be bound - 0.0.0.0 means "all interfaces"
    # There are two special values which are interpreted dynamically:
    #   1.) "<getHostAddress>" uses the host IP found at runtime
    #   2.) "<getHostName>" uses the host DNS name found at runtime
    address = "0.0.0.0"

    # port to which the HTTP server should be bound
    # setting to `0` will choose an available port at random.
    port = 8080

    # Whether the webserver should perform TLS termination
    # this is inferred to be no/false by default, unless keystore information is provided
    # via the `SSL_KEYSTORE_PATH` and `SSL_KEYSTORE_PASSWORD` environment variables. If this
    # is set to `yes/true` but the environment variables are not set, standard java system properties
    # such as `-Djavax.net.ssl.keyStore` and `-Djavax.net.ssl.keyStorePassword` may be used to configure
    # the keystore.
    # When enabled, the webserver will serve HTTPS traffic on the configured `webserver.port`
    # instead of HTTP. The TLS name used will be based on `webserver-advertise`, if provided,
    # or will be inferred from the `webserver.address` if not.
    # We recommend using a reverse proxy in front of Streaming Graph for TLS termination instead, as
    # it provides more flexibility and better performance.
    use-tls = no
  }
  # (optional) Configuration to use when advertising this server
  # (e.g., canonical address), if different than bind
  # configuration (e.g., when deployed behind a reverse proxy).
  # webserver-advertise {
    # Hostname or address using which the application should generate
    # user-facing hyperlinks to itself. This should be uniquely
    # resolvable from the end-users' client.
    # address = "localhost"

    # port (on `address`) via which the HTTP server can be reached
    # port = 8080
  # }

  cluster {
    # An arbitrary name to use for this cluster. It is used for actor
    # addresses and will appear in logs. All members of the same cluster
    # must share the same cluster name.
    name = quine-cluster

    # Minimum number of cluster members which must join before the
    # cluster is operational. This value is ignored on all nodes other
    # than the seed. Default is `1`
    target-size = 1

    # Duration determining how long to wait before a unreachable member
    # is removed from the cluster. A short value will heal a cluster
    # faster, at the expense of prematurely ejecting a member which might
    # become reachable again. This value should be set to a number of
    # seconds that is NOT LOWER than:  `10 * log(cluster_size)`  where
    # the `cluster_size` includes the number of hot-spares.
    # If not set, this value is estimated automatically.
    # member-down-timeout = 10 seconds

    # Duration a vacating member is given to terminate of its own accord
    # once it has been given permission before it is actively downed.
    member-leave-timeout = 2 seconds

    # The clustering service will be bound to a specific address and port.
    # The combination of address and port serves to uniquely identify
    # each cluster member. This is also the address/port where incoming
    # traffic from other cluster members is directed.
    this-member {
      # Hostname or interface IP address to which the clustering service
      # is bound.
      # There are two special values which are interpreted dynamically:
      #   1.) "<getHostAddress>" uses the host IP found at runtime
      #   2.) "<getHostName>" uses the host DNS name found at runtime
      address = <getHostAddress>

      # The port to which the clustering service is bound.
      # setting to `0` will choose an available port at random.
      port = 25520

      # Optionally define a specific position in the cluster, which might
      # be overridden by the cluster. Cluster position is 0-indexed.
      # Setting to a negative number will let the cluster assign the
      # position of this member. If the specified position is already
      # occupied, this member will be assigned to another location or
      # used as a hot-spare.
      preferred-position = -1

      # When the cluster enters an Operating state, this value determines
      # whether this particular member will automatically resume any ingest
      # streams that were previously running by a member formerly at this
      # cluster position. This is relevant for restarts of the entire cluster
      # and when a single member takes over for another
      # failed member. This setting will be retained if this member joins the
      # cluster as a spare, so that when/if it is needed, this setting will
      # apply no matter which position it fills in the cluster.
      restore-ingest = true
    }

    # Seeds define the address/port of at least one member of the cluster.
    # Any member of the cluster can function as a seed node (including
    # hot-spares). It is strongly recommended to specify multiple seed
    # nodes; if a member goes down and restarts, it will contact the seed
    # nodes listed here. If a seed node restarts, it will need to contact
    # other seed nodes in order to rejoin the cluster.
    cluster-join = {
      type = static-seed-addresses
      seed-addresses = []
    }
      #
      # Example seed-addresses entry:
      # {
      #   Hostname or IP address where the cluster seed node is accessed.
      #   It must be uniquely resolvable on the network. The default value
      #   assumed this-member is the seed.
      #   address = localhost
      #   Port where the cluster seed node can be accessed.
      #   port = 25520
      # }
      # Seed addresses may also be passed in as comma-separated list of host:port pairs in the
      # $QUINE_SEED_ADDRESSES environment variable. If the port is left off, it defaults to 25520.

    # Can use a DNS address, e.g. `quine` in this example at which resolves to one or more
    # Streaming Graph hosts as the cluster discovery mechanism.
    # This will cause the Pekko Management HTTP server to be started on port 7626 by default.
    # See https://pekko.apache.org/docs/pekko-management/1.0.0/pekko-management.html for more
    # information on the Pekko Management HTTP server, including configuration options.
    # cluster join = {
    #   type = dns-entry
    #   name = quine
    # }
    # The DNS entry to resolve can also be specified with the $QUINE_SEED_DNS environment variable.
  }

  # configuration for the id-assignment scheme the application should use.
  # This must be the same on all clustered hosts
  id {
    # one of [uuid-3, uuid-4, uuid-5, long, byte-array, uuid]
    # - uuid-3:     generate UUIDv3s according to the RFC specification
    # - uuid-4:     generate UUIDs labelled as v4, with id() and strId()
    #               returning random UUIDs, and idFrom returning
    #               deterministic UUIDs with version 4 identifying bytes
    # - uuid-5:     generate UUIDv5s according to the RFC specification
    # - long:       generate random integer IDs in the range
    #               [-(2^53-1), 2^53-1] -- these may be safely used as
    #               IEEE double-precision floating-point values without
    #               loss of precision. This id scheme is not appropriate
    #               for large-scale datasets because of the high
    #               likelihood of a collision
    # - byte-array: generate unstructured byte arrays as IDs
    # - uuid:       generate UUIDs with a mix of versions and namespaces
    type = uuid

    # whether the id scheme should be extended with a host-aware
    # partitioning schema. When "true", ids will be prefixed with a
    # "partition" key, and two IDs with the same partition key will
    # always be managed by the same shard
    partitioned = false

    # for uuid-5 and uuid-3 configuration, a UUID namespace may also be
    # set. This must be the same on all clustered hosts
    # namespace = "00000000-0000-0000-0000-000000000000"
  }

  # Selects the order edges between nodes are returned in queries
  # one of [reverse-insertion, unordered]
  # reverse-insertion means the edges are returned in the reverse
  # of the order they were added (that is, from newest to oldest).
  edge-iteration = reverse-insertion

  # (optional) The number of nodes in a shard's cache before that shard
  # will begin to expire nodes from its cache.
  in-memory-soft-node-limit = 10000

  # (optional) A limit to the total number of nodes in a shard's cache.
  # Attempts to create a node that would exceed this limit will return
  # an error. This value must always be higher than
  # `in-memory-soft-node-limit`
  in-memory-hard-node-limit = 75000

  # configuration for which data to save about nodes and when to do so
  persistence {
    # whether to save node journals. "true" uses more disk space and
    # enables more functionality, such as historical queries
    journal-enabled = true

    # one of [on-node-sleep, on-node-update, never]. When to save a
    # snapshot of a node's current state, including any DistinctId Standing
    # Queries registered on the node
    snapshot-schedule = on-node-sleep

    # whether only a single snapshot should be retained per-node. If
    # false, one snapshot will be saved at each timestamp against which
    # a historical query is made
    snapshot-singleton = false

    # when to save Standing Query partial result (only applies for the
    # `MultipleValues` mode -- `DistinctId` Standing Queries always save
    # when a node saves a snapshot, regardless of this setting)
    standing-query-schedule = on-node-sleep

    # whether effects in-memory occur before or after updates are confirmed
    # persisted to disk.
    # Possible values: memory-first, persistor-first
    effect-order = persistor-first
  }

  # storage backend / "persistor" configuration. There are several
  # possible "type"s, non-default options are below (commented out)
  store {
    # store data in a local filesystem using RocksDB. This is not
    # supported in a multi-host cluster
    type = rocks-db

    # base folder in which RocksDB data will be stored
    filepath = "quine.db"

    # whether to create any directories in "filepath" that do not yet
    # exist
    create-parent-dir = no

    # whether to use a write-ahead log.
    write-ahead-log = on

    # whether to force all writes to be fully confirmed to disk. This
    # is substantially slower, but maintains data integrity even under
    # power loss (write-ahead-log is enough to maintain integrity due
    # to process crashes).
    sync-all-writes = off

    # if set, the number of nodes for which to optimize node creation
    # latency
    # bloom-filter-size =
  }
  # store {
  #   # store data in an Apache Cassandra instance
  #   type = cassandra
  #
  #   # "host:port" strings at which Cassandra nodes can be accessed from
  #   # the application
  #   endpoints = [
  #     "localhost:9042"
  #   ]
  #
  #   # the keyspace to use
  #   keyspace = quine
  #
  #   # whether the application should create the keyspace if it does not
  #   # yet exist
  #   should-create-keyspace = true
  #
  #   # whether the application should create tables in the keyspace if
  #   # they do not yet exist. Note that creating namespaces will create
  #   # tables in the keyspace regardless of this setting.
  #   should-create-tables = true
  #
  #   # how many copies of each datum the Cassandra cluster should retain
  #   replication-factor = 1
  #
  #   # how many hosts must agree on a datum for Quine to consider that
  #   # datum written/read
  #   write-consistency = LOCAL_QUORUM
  #   read-consistency = LOCAL_QUORUM
  #
  #   # passed through to Cassandra
  #   local-datacenter = "datacenter1"
  #
  #   # how long to wait before considering a write operation failed
  #   write-timeout = "10s"
  #
  #   # how long to wait before considering a read operation failed
  #   read-timeout = "10s"
  #
  #   # if set, the number of nodes for which to optimize node creation
  #   # latency
  #   # bloom-filter-size =
  # }
  # store {
  #   # store data in a memory-mapped local file using MapDB. This is not
  #   # supported in a multi-host cluster
  #   type = map-db
  #
  #   # base filename from which MapDB filenames will be created. For
  #   # example, "quine.db", "quine.db.part.3", etc. If omitted,
  #   # file will be requested from the OS, which will be removed on shutdown.
  #   # filepath = _
  #
  #   # whether to create any directories in "filepath" that don't yet exist
  #   create-parent-dir = no
  #
  #   # how many files to use. MapDB performance slows dramatically above
  #   # around 2GB per file
  #   number-partitions = 1
  #
  #   # whether to use a write-ahead log. Does not support Windows hosts.
  #   write-ahead-log = off
  #
  #   # if write-ahead-log = true, how often to commit the write ahead log
  #   commit-interval = "10s"
  #
  #   # if set, the number of nodes for which to optimize node creation
  #   # latency
  #   # bloom-filter-size =
  # }
  # store {
  #   # store data in a ClickHouse cluster. Each thatDot Streaming Graph cluster member
  #   # must connect to a different ClickHouse shard, and the number of Streaming Graph
  #   # cluster members must be the same as the number of ClickHouse shards. Additionally,
  #   # The ClickHouse cluster should be preconfigured with the quine database schema.
  #   type = click-house
  #
  #   # uri of the ClickHouse shard this Streaming Graph cluster member should connect to
  #   url = "http://localhost:8123"
  #
  #   # name of the ClickHouse database to which to connect
  #   database = "quine"
  #
  #   # ClickHouse username
  #   username = "quine"
  #
  #   # ClickHouse password
  #   password = "quine"
  #
  #   # if set, the number of nodes for which to optimize node creation
  #   # latency
  #   # bloom-filter-size =
  # }
  # store {
  #   # do not store any data, only use the temporary node cache
  #   # all writes to the persistor will be a no-op.
  #   type = empty
  # }
  # store {
  #   # Use in-memory maps to simulate a local persistor.
  #   type = in-memory
  # }

  # where metrics collected by the application should be reported
  metrics-reporters = [
    {
      # one of [jmx, csv, influxdb, slf4j]
      # jmx will report metrics as namespaced MBeans. Other alternatives
      # are listed (commented out) below
      type = jmx
    }
    # {
    #   # create a csv file for each reported metric
    #   type = csv
    #
    #   # required by csv - the interval at which new rows will be
    #   # written to the CSV file (for example, 200ms)
    #   period = _
    #
    #   # required by csv - the directory in which the csv files should
    #   # be created and written
    #   log-directory = _
    # }
    # {
    #   # report metrics to an influxdb (version 1) database
    #   type = influxdb
    #
    #   # required by influxdb - the interval at which new records will
    #   # be written to the database
    #   period = _
    #
    #   # connection information for the influxdb database
    #   database = metrics
    #   scheme = http
    #   host = localhost
    #   port = 8086
    #
    #   # authentication information for the influxdb database. Both
    #   # fields may be omitted
    #   # user =
    #   # password =
    # }
    # {
    #   # log metrics via an slf4j logger
    #   type = slf4j
    #
    #   # required by slf4j - the interval at which new records will be
    #   # logged
    #   period = _
    #
    #   # logger to which metrics will be logged
    #   logger-name = metrics
    # }
  ]

  # Startup and shutdown timeout for the Quine Application
  # The system will throw an error and exit if any component required
  # to start or shutdown Quine takes longer that this time
  timeout = 2 m


  # the property on a node reserved to store that node's labels. This must be
  # the same on all cluster members. It is not recommended to change this after
  # data has been added to the graph, as it will change the behavior of some
  # queries that rely on labels or properties, which may make them inconsistent
  # with queries run before the change
  labels-property = "__LABEL"

  # the minimum amount of time a node must stay in the cache after
  # being updated
  decline-sleep-when-write-within = 100 ms

  # the minimum amount of time a node must stay in the cache after
  # being accessed
  decline-sleep-when-access-within = 0 ms

  # nodes will wait up to this amount of time before processing messages
  # when at-time is in the future (occurs when there is difference in
  # the system clock across nodes in the cluster)
  max-catch-up-sleep = 2000 ms  

  # whether the application should log its current config at startup
  dump-config = no

  bolt {
    # whether the BOLT protocol should be enabled
    enabled = false

    # one of [optional, required, disabled]. Whether TLS should be used
    # for BOLT connections. See the security section to configure TLS
    # certificates
    encryption = optional

    # The interface to which the bolt service should be bound.
    # There are two special values which are interpreted dynamically:
    #   1.) "<getHostAddress>" uses the host IP found at runtime
    #   2.) "<getHostName>" uses the host DNS name found at runtime
    address = "0.0.0.0"

    # the port on which the BOLT protocol handler should listen
    # setting to `0` will choose an available port at random.
    port = 7687
  }

  # configuration of TLS certificates used by BOLT
  security {
    # the SSL keystore to use. If omitted, this will default to the
    # keystore specified by the javax.net.ssl.keyStore system property.
    # keystore =

    # the password for the SSL keystore to use. If omitted, this will
    # default to the password specified by the
    # javax.net.ssl.keyStorePassword system property
    # password =
  }

  # which metrics are enabled and their configurations
  metrics {
    # whether to enable debug metrics (i.e., metrics whose collection may slow down
    # the operation of thatDot Streaming Graph)
    enable-debug-metrics = no
  }

  # configuration for the log sanitizer
  log-config {
    # whether to hide potentially sensitive information in logs (e.g., values
    # derived from ingested records)
    show-unsafe = no

    # whether to show exceptions in logs. These may contain sensitive information
    # and may include verbose stacktraces. The stack trace depth limit (or, number
    # of function calls captured and logged as part of a stack trace) may be set via\
    # the standard `-XX:MaxJavaStackTraceDepth` JVM option
    show-exceptions = no

    # the redaction method to use when hiding sensitive information
    redactor {
        # must be "redact-hide", which replaces potentially sensitive information
        # with a placeholder string "**REDACTED**"
        type = redact-hide
    }
  }

  # trial permissions (Streaming Graph trial version only)
  #trial {
  #  email = "test_trial_version@thatdot.com"
  #  api-key = TRIAL_API_KEY
  #}
}

Additional Pekko Configuration

Some Pekko-specific configuration options are useful for operating thatDot Streaming Graph.

Pekko Coordinated Shutdown

The cluster shutdown protocol has timeouts for each phase which may need to be adjusted based on the current workload. For more information on Pekko Coordinated Shutdown in general, see the Pekko documentation on the subject: https://pekko.apache.org/docs/pekko/current/coordinated-shutdown.html

Especially when ingesting from a Kafka source, it may be necessary to increase the timeout of the before-cluster-shutdown phase. Dependending on batch size and number of ingests, the cluster may need more than the default 5 seconds to perform cleanup actions for shutdown. This should be taken into account when configuring the shutdown grace period for orchestration tools such as Docker and Kubernetes.

Warning message example:

akka.actor.CoordinatedShutdown - Coordinated shutdown phase [before-cluster-shutdown] timed out after 5000 milliseconds

Example Configuration:

pekko.coordinated-shutdown.phases {
    before-cluster-shutdown.timeout = "10 seconds"
}