> For the complete documentation index, see [llms.txt](https://v2.dataos.info/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://v2.dataos.info/concepts/resources/nilus/stream/stream-sources/nats.md).

# NATS

[NATS](https://nats.io/) is a lightweight, high-performance open-source messaging system. Its persistence layer, [JetStream](https://docs.nats.io/nats-concepts/jetstream), adds durable streams, message replay, and work queues. Nilus consumes NATS+JetStream as a **stream source** (`spec.type: stream`): an ephemeral JetStream consumer pulls records in configurable batches and writes them to the configured sink.

Nilus connects to a JetStream cluster directly through the `nats+jetstream://` URI scheme, there is no DataOS depot variant for NATS today.

{% hint style="info" %}
Even though JetStream is event-streaming, Nilus pulls in **micro-batches**. Tune `batch_size` and `timeout` to match downstream sink throughput and the source subject's traffic profile.
{% endhint %}

## Requirements

Connectivity and credentials must both be in place before the pipeline can run.

### Connectivity

* The Nilus runtime must reach the NATS server on the configured TCP port (default `4222`).
* JetStream must be turned on for the server, and the chosen subject must be backed by a durable stream the consumer can read.
* The credentials in the URI must grant: connect, subscribe to the chosen subject, and read access on the JetStream stream.

### Authentication

NATS supports several authentication modes; pick whichever the server is set up for.

| Mode                | URI form                                                                                                       |
| ------------------- | -------------------------------------------------------------------------------------------------------------- |
| Username & password | `nats+jetstream://{NATS_USERNAME}:{NATS_PASSWORD}@<host>:<port>?...`                                           |
| Static token        | `nats+jetstream://<host>:<port>?token={NATS_TOKEN}&...` (or `nats+jetstream://{NATS_TOKEN}@<host>:<port>?...`) |
| NKeys seed          | `nats+jetstream://<host>:<port>?nkeys_seed={NATS_NKEYS_SEED}&...`                                              |

{% hint style="info" %}
Project credentials and seeds through `spec.use.projection` instead of inlining literal values in YAML.
{% endhint %}

### Required parameters

| Parameter      | Required | Default | Description                                                                                    |
| -------------- | -------- | ------- | ---------------------------------------------------------------------------------------------- |
| `subject`      | Yes      | -       | NATS subject the JetStream consumer filters on. Supplied in the URI query string.              |
| `source_table` | Yes      | -       | Used as the JetStream **stream name** the consumer attaches to. Passed under `source.options`. |

### Optional parameters

| Parameter     | Default     | Description                                                                                                                          |
| ------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------ |
| `host`        | `localhost` | NATS server hostname. Provided in the URI authority.                                                                                 |
| `port`        | `4222`      | NATS server port. Provided in the URI authority.                                                                                     |
| `durable`     | -           | Name to use for a durable JetStream consumer. Set this when you want the consumer (and offsets) to persist across pipeline restarts. |
| `batch_size`  | `100`       | Maximum number of messages pulled per micro-batch.                                                                                   |
| `timeout`     | `10`        | Maximum seconds Nilus waits to fill a batch before flushing.                                                                         |
| `run_in_loop` | `false`     | When `true`, the stream service stays running and continues pulling new batches indefinitely.                                        |

### URI format

```
nats+jetstream://[<credentials>@]<host>:<port>?subject=<subject>[&durable=<name>][&batch_size=<n>][&timeout=<s>][&token=<token>|&nkeys_seed=<seed>]
```

## Core concepts

* **Sequence-based consumption.** Nilus tracks JetStream sequence numbers, analogous to Kafka offsets, so a restart resumes from the last processed record.
* **Ephemeral vs. durable consumers.** By default, Nilus creates an ephemeral JetStream consumer per pipeline. Set `durable=<name>` to attach to (or create) a durable consumer so offsets survive pipeline restarts.
* **Filter subject.** The JetStream consumer is filtered by the `subject` parameter, so a single stream can feed multiple Nilus pipelines on different subjects.
* **Acknowledgement policy.** Nilus uses `AckNone` (auto-commit) and an instant replay policy. This is intentional for high-throughput pipelines; if you need exactly-once semantics, model deduplication on the sink side using `incremental_strategy: merge`.

## Source options

| Option         | Required | Description                                       | Passed in                     |
| -------------- | -------- | ------------------------------------------------- | ----------------------------- |
| `subject`      | Yes      | NATS subject to subscribe to.                     | `source.address` query string |
| `source_table` | Yes      | JetStream stream name.                            | `source.options`              |
| `durable`      | No       | Durable consumer name.                            | `source.address` query string |
| `batch_size`   | No       | Messages per micro-batch.                         | `source.address` query string |
| `timeout`      | No       | Seconds to wait for a batch.                      | `source.address` query string |
| Auth params    | No       | `token`, `nkeys_seed`, or basic-auth credentials. | `source.address`              |
| `run_in_loop`  | No       | Keep the consumer running after the first batch.  | `source.options`              |

### Examples

**Local development cluster, no auth**

```yaml
source:
  address: nats+jetstream://localhost:4222?subject=orders.created&batch_size=500&timeout=10
  options:
    source_table: orders-stream
    run_in_loop: true
```

**Username and password, durable consumer**

```yaml
source:
  address: nats+jetstream://{NATS_USERNAME}:{NATS_PASSWORD}@nats.prod:4222?subject=orders.created&durable=nilus-orders&batch_size=1000&timeout=30
  options:
    source_table: orders-stream
    run_in_loop: true
```

**Token authentication**

```yaml
source:
  address: nats+jetstream://nats.prod:4222?subject=orders.created&token={NATS_TOKEN}&batch_size=1000&timeout=30
  options:
    source_table: orders-stream
    run_in_loop: true
```

**NKeys seed**

```yaml
source:
  address: nats+jetstream://nats.prod:4222?subject=orders.created&nkeys_seed={NATS_NKEYS_SEED}&batch_size=1000&timeout=30
  options:
    source_table: orders-stream
    run_in_loop: true
```

## Sink options

| Option                 | Required                           | Description                                                                                 |
| ---------------------- | ---------------------------------- | ------------------------------------------------------------------------------------------- |
| `dest_table`           | Yes                                | Destination object (`schema.table` for warehouses, fully-qualified path for object stores). |
| `incremental_strategy` | Yes                                | Write strategy: `append`, `replace`, or `merge`. Streaming pipelines normally use `append`. |
| `primary_key`          | When `incremental_strategy: merge` | Column(s) used to deduplicate.                                                              |
| `create_table`         | No                                 | Auto-create the destination if missing. Default: `true`.                                    |

### Example sink

```yaml
sink:
  address: dataos://orders-lakehouse?purpose=rw
  options:
    dest_table: analytics.orders_stream
    incremental_strategy: append
```

## Sample Nilus config

```yaml
name: nats-orders-stream
version: v1alpha
type: nilus
tags:
  - nilus-stream
description: Stream NATS+JetStream orders into the lakehouse.
spec:
  type: stream
  compute: universe-compute
  source:
    address: nats+jetstream://nats.prod:4222?subject=orders.created&durable=nilus-orders&batch_size=1000&timeout=30&nkeys_seed={NATS_NKEYS_SEED}
    options:
      source_table: orders-stream
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

## Best practices

* Use **dedicated subjects** for ingestion so the JetStream consumer is not woken by unrelated traffic.
* Set `durable=<name>` on production pipelines so offsets survive pipeline restarts.
* Tune `batch_size` and `timeout` together: larger batches improve sink throughput, larger timeouts reduce empty pulls on quiet subjects.
* Always set `run_in_loop: true` for long-running stream services. Without it, Nilus exits after the first pull.
* Monitor JetStream consumer pending messages and sequence lag from the NATS server side; sustained lag is the leading indicator of an under-sized pipeline.

## Troubleshooting

| Symptom                              | Likely cause                                                               | Resolution                                                                                             |
| ------------------------------------ | -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| `no responders available for stream` | The JetStream stream named in `source_table` does not exist.               | Create the stream on the NATS server (or correct the name) before starting the pipeline.               |
| `permissions violation`              | The authenticated user is missing subscribe or JetStream read permissions. | Update the NATS authorization to allow subscribe on the subject and read on the stream.                |
| Pipeline exits after the first batch | `run_in_loop` not set.                                                     | Set `run_in_loop: true` under `source.options`.                                                        |
| Old data is replayed on restart      | Ephemeral consumer was recreated and replay starts from the beginning.     | Set a stable `durable=<name>` so the consumer (and its offsets) persist.                               |
| Token / nkeys auth fails             | Secret not projected into the URI.                                         | Project the credential through `spec.use.projection` and reference it as a `{PLACEHOLDER}` in the URI. |

## Related docs

* [Understanding Stream Data Movement](/concepts/resources/nilus/stream.md)
* [Understanding Stream Pipeline Config](https://github.com/moderndatacompany/dataos/blob/main/documentation/concepts/resources/nilus/stream/pipeline-config.md)
* [Stream Sample Configs](/concepts/resources/nilus/stream/sample-configs.md)
* [Kafka](/concepts/resources/nilus/stream/stream-sources/kafka.md)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://v2.dataos.info/concepts/resources/nilus/stream/stream-sources/nats.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
