> For the complete documentation index, see [llms.txt](https://v2.dataos.info/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://v2.dataos.info/concepts/resources/nilus/stream/sample-configs.md).

# Stream Sample Configs

All examples below use the current Nilus pipeline shape with `spec.type: stream`:

```yaml
type: nilus
spec:
  type: stream
```

Streaming pipelines pull records in **micro-batches** and write them to the configured sink. Tune `batch_size` and the per-source timeout (`batch_timeout` for Kafka, `timeout` for NATS) so the consumer fills batches without blocking the sink.

{% hint style="info" %}
Always set `run_in_loop: true` under `source.options` for streaming services. Without it, Nilus exits after the first pull. Project secrets through `spec.use.projection`; never inline literal credentials in the URI.
{% endhint %}

### Kafka

<details>

<summary>Plaintext development cluster</summary>

```yaml
name: kafka-orders-stream-dev
version: v1alpha
type: nilus
spec:
  type: stream
  compute: universe-compute
  source:
    address: kafka://?bootstrap_servers=broker1:9092,broker2:9092&group_id=nilus-dev&batch_size=1000&batch_timeout=10
    options:
      source_table: orders
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

</details>

<details>

<summary>SASL_SSL with SCRAM-SHA-512</summary>

```yaml
name: kafka-orders-stream-prod
version: v1alpha
type: nilus
spec:
  type: stream
  compute: universe-compute
  use:
    projection:
      secrets:
        - id: tenant:kafka-creds
          contextAlias: kafkaSecret
      projections:
        envVars:
          - key: KAFKA_USERNAME
            template: "{{ secrets['kafkaSecret'].username | base64_decode }}"
          - key: KAFKA_PASSWORD
            template: "{{ secrets['kafkaSecret'].password | base64_decode }}"
  source:
    address: kafka://?bootstrap_servers=broker.prod:9093&group_id=nilus-prod&security_protocol=SASL_SSL&sasl_mechanisms=SCRAM-SHA-512&sasl_username={KAFKA_USERNAME}&sasl_password={KAFKA_PASSWORD}&ssl_ca_location=/etc/dataos/secret/kafka-ca.pem&batch_size=2000&batch_timeout=10
    options:
      source_table: orders
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

</details>

<details>

<summary>DataOS Depot</summary>

```yaml
name: kafka-orders-stream-depot
version: v1alpha
type: nilus
spec:
  type: stream
  compute: universe-compute
  source:
    address: dataos://kafkadepot?purpose=rw&batch_size=1000&batch_timeout=10&group_id=nilus-prod
    options:
      source_table: orders
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

</details>

### NATS+JetStream

<details>

<summary>Local server, no auth</summary>

```yaml
name: nats-orders-stream-dev
version: v1alpha
type: nilus
spec:
  type: stream
  compute: universe-compute
  source:
    address: nats+jetstream://localhost:4222?subject=orders.created&batch_size=500&timeout=10
    options:
      source_table: orders-stream
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

</details>

<details>

<summary>Username/password with durable consumer</summary>

```yaml
name: nats-orders-stream-prod
version: v1alpha
type: nilus
spec:
  type: stream
  compute: universe-compute
  use:
    projection:
      secrets:
        - id: tenant:nats-creds
          contextAlias: natsSecret
      projections:
        envVars:
          - key: NATS_USERNAME
            template: "{{ secrets['natsSecret'].username | base64_decode }}"
          - key: NATS_PASSWORD
            template: "{{ secrets['natsSecret'].password | base64_decode }}"
  source:
    address: nats+jetstream://{NATS_USERNAME}:{NATS_PASSWORD}@nats.prod:4222?subject=orders.created&durable=nilus-orders&batch_size=1000&timeout=30
    options:
      source_table: orders-stream
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

</details>

<details>

<summary>NKeys seed (recommended for production)</summary>

```yaml
name: nats-orders-stream-nkeys
version: v1alpha
type: nilus
spec:
  type: stream
  compute: universe-compute
  use:
    projection:
      secrets:
        - id: tenant:nats-nkeys
          contextAlias: natsSecret
      projections:
        envVars:
          - key: NATS_NKEYS_SEED
            template: "{{ secrets['natsSecret'].seed | base64_decode }}"
  source:
    address: nats+jetstream://nats.prod:4222?subject=orders.created&durable=nilus-orders&batch_size=1000&timeout=30&nkeys_seed={NATS_NKEYS_SEED}
    options:
      source_table: orders-stream
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

</details>

## Validation notes

* Use `type: nilus` and `spec.type: stream` for streaming connectors. Misclassifying a stream as `spec.type: batch` causes Nilus to run the pipeline once instead of staying connected.
* Set `run_in_loop: true` under `source.options` so the consumer keeps pulling new batches.
* Prefer `incremental_strategy: append` for streaming sinks. Use `merge` only when the destination has a stable primary key and you genuinely need deduplication.
* Project all credentials through `spec.use.projection`. Never inline literal usernames, passwords, tokens, or NKeys seeds.
* Keep `batch_size` aligned with what the sink can absorb in a single write; oversized batches cause back-pressure, undersized batches under-utilize the pipeline.

## Related docs

* [Understanding Stream Pipeline Config](/concepts/resources/nilus/stream.md)
* [Batch Sample Configs](/concepts/resources/nilus/batch/sample-configs.md)
* [CDC Sample Configs](/concepts/resources/nilus/cdc/sample-configs.md)
* [Optimize Sink Datasets](/concepts/resources/nilus/pipeline-optimization/optimize-sink-datasets.md): tune `incremental_strategy: append`, `batch_size`, partitioning, and other dataset-shape settings for streaming sinks.
* [Kafka](/concepts/resources/nilus/stream/stream-sources/kafka.md)
* [NATS](/concepts/resources/nilus/stream/stream-sources/nats.md)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://v2.dataos.info/concepts/resources/nilus/stream/sample-configs.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
