> For the complete documentation index, see [llms.txt](https://v2.dataos.info/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://v2.dataos.info/concepts/resources/nilus/stream/stream-sources/kafka.md).

# Kafka

[Apache Kafka](https://kafka.apache.org/) is a distributed event-streaming platform used to publish, subscribe to, store, and process streams of records. Nilus consumes Kafka as a **stream source** (`spec.type: stream`): a long-running consumer pulls messages in configurable batches, deserializes them, and writes the result to the configured sink.

Nilus connects to Kafka in one of two ways:

* **Direct URI**: `kafka://?bootstrap_servers=...` with all connection options expressed as query parameters.
* **DataOS Depot**: `dataos://<depot>?purpose=rw` for clusters whose connection string, credentials, and TLS material are managed centrally in DataOS.

{% hint style="info" %}
Even though the underlying source is event-streaming, Nilus pulls in **micro-batches**. Tune `batch_size` and `batch_timeout` to match downstream sink throughput.
{% endhint %}

## Requirements

Connectivity and credentials must both be in place before the pipeline can run.

### Connectivity

* The Nilus runtime must reach every broker listed in `bootstrap_servers` over the configured port (commonly `9092` for plaintext or `9093` for TLS).
* For SASL/SSL clusters, the runtime needs read access to any certificate, key, or truststore files referenced from the URI.
* The consumer must have permission to read from the target topic and to commit offsets for the chosen `group_id`.

### Required parameters

| Parameter           | Required | Default | Description                                                                  |
| ------------------- | -------- | ------- | ---------------------------------------------------------------------------- |
| `bootstrap_servers` | Yes      | -       | Comma-separated list of Kafka brokers in `host:port` format.                 |
| `group_id`          | Yes      | -       | Consumer group identifier used for partition assignment and offset tracking. |
| `source_table`      | Yes      | -       | Kafka topic to consume from. Passed under `source.options`.                  |

### Optional parameters

| Parameter                  | Default     | Description                                                                                           |
| -------------------------- | ----------- | ----------------------------------------------------------------------------------------------------- |
| `batch_size`               | `3000`      | Maximum number of messages pulled per micro-batch.                                                    |
| `batch_timeout`            | `3`         | Maximum seconds Nilus waits to fill a batch before flushing.                                          |
| `security_protocol`        | `PLAINTEXT` | One of `PLAINTEXT`, `SSL`, `SASL_PLAINTEXT`, `SASL_SSL`.                                              |
| `sasl_mechanisms`          | -           | Required when `security_protocol` is SASL-based. Examples: `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`. |
| `sasl_username`            | -           | SASL username. Project from secrets.                                                                  |
| `sasl_password`            | -           | SASL password. Project from secrets.                                                                  |
| `ssl_ca_location`          | -           | Path to the CA certificate that signs the broker certificates.                                        |
| `ssl_certificate_location` | -           | Path to the client certificate (mutual TLS).                                                          |
| `ssl_key_location`         | -           | Path to the client key (mutual TLS).                                                                  |
| `ssl_key_password`         | -           | Password for the client key.                                                                          |
| `ssl_truststore_location`  | -           | Java-style truststore (`.jks` / `.p12`) location.                                                     |
| `ssl_truststore_password`  | -           | Truststore password.                                                                                  |
| `ssl_truststore_type`      | `JKS`       | Truststore type: `JKS` or `PKCS12`.                                                                   |
| `run_in_loop`              | `false`     | When `true`, the stream service stays running and continues pulling new batches indefinitely.         |

{% hint style="info" %}
Treat `sasl_password`, `ssl_key_password`, and `ssl_truststore_password` as secrets. Project them through `spec.use.projection` rather than inlining literal values.
{% endhint %}

### URI formats

```
kafka://?bootstrap_servers=<brokers>&group_id=<consumer-group>[&<option>=<value>...]
dataos://<kafka-depot>?purpose=rw[&<option>=<value>...]
```

### Depot-backed SASL\_SSL setup

For a SASL\_SSL cluster, define a Kafka **depot** plus a **secret** that carries both the SASL credentials and the TLS material. Nilus projects the certificate files into the runtime and wires them to `ssl_ca_location` / `ssl_truststore_location` automatically, so you do not need to pass any `ssl_*_location` paths in the address.

**Secret:** SASL credentials in `data`, certificate/truststore files in `files`:

```yaml
name: niluskafkasslsecret
version: v2alpha
type: secret
description: Kafka SASL_SSL credentials and certs for Nilus
layer: user
secret:
  type: key-value
  data:
    security_protocol: SASL_SSL
    sasl_mechanism: PLAIN          # or SCRAM-SHA-256 / SCRAM-SHA-512
    username: "<sasl-username>"
    password: "<sasl-password>"
    trust_store_type: JKS
    trust_store_password: "<truststore-password>"
  files:
    ca_file: <path>/ca-root.pem
    trust_store_file: <path>/kafka.truststore.jks
```

**Depot:** references the secret with a `purpose`:

```yaml
version: v2alpha
name: "niluskafkassldepot"
type: depot
tags:
  - kafka
layer: user
spec:
  type: kafka
  description: "Kafka SASL_SSL depot for real-time ingestion"
  spec:
    brokers:
      - kafka-sasl-ssl-tmdc.dataos.info:9093
  secrets:
    - id: "<workspace>:niluskafkasslsecret"
      purpose: rw
```

The pipeline then references only the depot; connection options like `group_id`, `batch_size`, and `batch_timeout` are appended to the address:

```yaml
source:
  address: dataos://niluskafkassldepot?purpose=rw&group_id=nilus-saslssl-group&batch_size=1000&batch_timeout=3
  options:
    source_table: <kafka-topic>
```

## Core concepts

* **Consumer groups.** Nilus relies on Kafka's native consumer-group protocol for partition assignment and offset commits. Each pipeline must use a stable `group_id` so restarts resume from the last committed offset.
* **Micro-batching.** Messages are pulled in batches sized by `batch_size` and bounded by `batch_timeout`. The whole batch is processed and then committed before the next pull.
* **Message shape.** Records are deserialized as JSON by default. The sink receives the message payload alongside Kafka metadata (`topic`, `partition`, `offset`, `timestamp`, `key`).
* **Security.** SASL (`PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`) and SSL/TLS (server-only or mutual) are both supported. FIPS-compliant truststores (`.jks` / `.p12`) are honored when configured.

## Source options

| Option                | Required | Description                                      | Passed in                     |
| --------------------- | -------- | ------------------------------------------------ | ----------------------------- |
| `source_table`        | Yes      | Kafka topic name.                                | `source.options`              |
| `batch_size`          | No       | Number of messages per micro-batch.              | `source.address` query string |
| `batch_timeout`       | No       | Maximum seconds to wait for a batch.             | `source.address` query string |
| `run_in_loop`         | No       | Keep the consumer running after the first batch. | `source.options`              |
| Security & SSL params | No       | SASL/SSL options listed above.                   | `source.address` query string |

### Examples

**Plaintext development cluster**

```yaml
source:
  address: kafka://?bootstrap_servers=broker1:9092,broker2:9092&group_id=nilus-dev
  options:
    source_table: orders
    run_in_loop: true
```

**SASL\_SSL with mTLS**

```yaml
source:
  address: kafka://?bootstrap_servers=broker:9093&group_id=nilus-prod&security_protocol=SASL_SSL&sasl_mechanisms=SCRAM-SHA-512&sasl_username={KAFKA_USERNAME}&sasl_password={KAFKA_PASSWORD}&ssl_ca_location=/etc/dataos/secret/kafka-ca.pem
  options:
    source_table: orders
    run_in_loop: true
```

**Throughput tuning**

```yaml
source:
  address: kafka://?bootstrap_servers=broker:9092&group_id=nilus-bulk&batch_size=10000&batch_timeout=10
  options:
    source_table: clickstream
    run_in_loop: true
```

## Sink options

| Option                 | Required                           | Description                                                                                 |
| ---------------------- | ---------------------------------- | ------------------------------------------------------------------------------------------- |
| `dest_table`           | Yes                                | Destination object (`schema.table` for warehouses, fully-qualified path for object stores). |
| `incremental_strategy` | Yes                                | Write strategy: `append`, `replace`, or `merge`. Streaming pipelines normally use `append`. |
| `primary_key`          | When `incremental_strategy: merge` | Column(s) used to deduplicate.                                                              |
| `create_table`         | No                                 | Auto-create the destination if missing. Default: `true`.                                    |

### Example sink

```yaml
sink:
  address: dataos://orders-lakehouse?purpose=rw
  options:
    dest_table: analytics.orders_stream
    incremental_strategy: append
```

## Sample Nilus config

```yaml
name: kafka-orders-stream
version: v1alpha
type: nilus
tags:
  - nilus-stream
description: Stream Kafka orders topic into the lakehouse.
spec:
  type: stream
  compute: runnable-default
  logLevel: ERROR
  resources:
    requests:
      cpu: 100m
      memory: 256Mi
  source:
    address: dataos://kafkadepot?purpose=rw&batch_size=1000&batch_timeout=10&group_id=nilus-prod
    options:
      source_table: orders
      run_in_loop: true
  sink:
    address: dataos://orders-lakehouse?purpose=rw
    options:
      dest_table: analytics.orders_stream
      incremental_strategy: append
```

## Best practices

* Use a **dedicated consumer group** per pipeline so offsets do not collide with other consumers.
* Keep `batch_size` aligned with the destination's preferred write size; oversized batches cause sink-side back-pressure, undersized batches under-utilize the pipeline.
* For low-traffic topics, raise `batch_timeout` so Nilus does not flush near-empty batches.
* Always set `run_in_loop: true` for long-running stream services. Without it, Nilus exits after the first pull.
* Project secrets through `spec.use.projection`; never commit literal `sasl_password` or key passwords to YAML.

## Troubleshooting

| Symptom                              | Likely cause                                                     | Resolution                                                                                                |
| ------------------------------------ | ---------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
| `Topic does not exist`               | `source_table` mismatch or topic not yet created on the cluster. | Verify the topic name and that auto-create is enabled or the topic was provisioned.                       |
| `SASL authentication failed`         | Wrong mechanism or credentials.                                  | Confirm `sasl_mechanisms` matches the broker config and that the projected username/password are correct. |
| `SSL handshake failed`               | Missing or wrong certificate paths.                              | Confirm `ssl_ca_location` (and client cert/key when using mTLS) exist inside the runtime container.       |
| Pipeline exits after the first batch | `run_in_loop` not set.                                           | Set `run_in_loop: true` under `source.options`.                                                           |
| Offsets stuck / replays old data     | Stale `group_id`.                                                | Use a new `group_id` or reset offsets with the standard Kafka tooling.                                    |

## Related docs

* [Understanding Stream Data Movement](/concepts/resources/nilus/stream.md)
* [Understanding Stream Pipeline Config](https://github.com/moderndatacompany/dataos/blob/main/documentation/concepts/resources/nilus/stream/pipeline-config.md)
* [Stream Sample Configs](/concepts/resources/nilus/stream/sample-configs.md)
* [NATS](/concepts/resources/nilus/stream/stream-sources/nats.md)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://v2.dataos.info/concepts/resources/nilus/stream/stream-sources/kafka.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
