> For the complete documentation index, see [llms.txt](https://v2.dataos.info/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://v2.dataos.info/concepts/resources/nilus/batch/batch-sources/mongodb.md).

# MongoDB

[MongoDB](https://www.mongodb.com/docs/) is a distributed NoSQL document database. Nilus reads from MongoDB as a **batch source**, with optional incremental ingestion driven by a monotonic field, and an aggregation-pipeline mode for source-side filtering and projection.

For row-level change capture from MongoDB, use the [MongoDB (CDC)](/concepts/resources/nilus/cdc/cdc-sources/mongodb.md) instead.

## Requirements

Connectivity and credentials must both be in place before the pipeline can run.

### Connectivity

* The Nilus runtime must reach the MongoDB endpoint. Self-hosted clusters listen on TCP `27017` by default; managed Atlas clusters are typically reached via SRV-discovery URIs.
* The connector accepts both `mongodb://` and `mongodb+srv://` URI schemes.
* For Atlas, allowlist the runtime egress IP range under **Network Access** before the first run, otherwise authentication will succeed but every read will time out.

### Required parameters

| Parameter        | Required    | Default | Description                                                                                                                               |
| ---------------- | ----------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| `host`           | Yes         | -       | MongoDB server hostname (or comma-separated replica-set member list).                                                                     |
| `port`           | No          | `27017` | MongoDB port. Omit when using `mongodb+srv://`.                                                                                           |
| `username`       | Conditional | -       | Required when the cluster enforces authentication.                                                                                        |
| `password`       | Conditional | -       | Required when the cluster enforces authentication.                                                                                        |
| Query parameters | No          | -       | Standard connection-string options forwarded to the driver: `tls=true`, `retryWrites=true`, `authSource=admin`, `replicaSet=<name>`, etc. |

### Database-side permissions

The connection user needs `read` on the source database and collection:

```javascript
db.grantRolesToUser("<username>", [
  { role: "read", db: "<database_name>" }
])
```

### URI format

```
mongodb://<username>:<password>@<host>:<port>?<optional-query-params>
```

SRV-discovery (Atlas-style):

```
mongodb+srv://<username>:<password>@<cluster>.mongodb.net?retryWrites=true&w=majority
```

Depot-backed (recommended for production):

```
dataos://<mongodb-depot>
```

> **Note** Do **not** put the database name in the URI path. The MongoDB source derives the database from the **first segment** of `source_table` (`<database>.<collection>`).

## Source options

| Option              | Required | Description                                                                                                                                                                                                       |
| ------------------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `source_table`      | Yes      | Either a plain `<database>.<collection>` reference, or `<database>.<collection>:[<aggregation-pipeline>]` to drive extraction with a source-side aggregation pipeline (see "Custom aggregation pipelines" below). |
| `incremental_key`   | No       | Document field used to identify newly visible documents per run. Should be indexed in MongoDB and consistently present across documents.                                                                          |
| `interval_start`    | No       | Optional ISO-8601 lower bound for the extraction window.                                                                                                                                                          |
| `interval_end`      | No       | Optional ISO-8601 upper bound for the extraction window.                                                                                                                                                          |
| `max_table_nesting` | No       | String. `"0"` flattens documents into wide rows for analytics targets; raise to `"1"` to preserve a single level of nested objects, `"2"` for two levels.                                                         |

> **Important** Earlier versions of this page documented `filter_`, `projection`, `chunk_size`, `parallel`, and `data_item_format` as YAML source options. **They are not currently forwarded** by the Nilus MongoDB source wrapper, so setting them in `source.options` has no effect. Use the aggregation-pipeline form of `source_table` for filtering and projection at the source side.

## Custom aggregation pipelines

To drive extraction with a MongoDB aggregation pipeline, suffix the collection name with `:[<pipeline>]`:

```yaml
source:
  address: dataos://mongodb-depot
  options:
    source_table: 'analytics.orders:[{"$match": {"status": "active"}}, {"$project": {"_id": 1, "customer_id": 1, "total": 1, "updated_at": 1}}]'
    incremental_key: updated_at
    interval_start: "2024-01-01T00:00:00Z"
```

Notes on the pipeline form:

* The pipeline must be a **JSON array** (not a `db.collection.aggregate(...)` shell expression). MongoDB Extended JSON v2 is supported via the BSON `json_util` parser.
* When `incremental_key` is set, the field **must** be projected out by your pipeline; otherwise the source raises a validation error before extraction begins.
* The connector substitutes `interval_start` / `interval_end` into the pipeline at runtime when both `incremental_key` and the interval bounds are present, so you can write `{"$match": {"updated_at": {"$gte": "$interval_start"}}}` and let the runtime fill in the value.

## Sample Nilus configs

Each example below is self-contained and uses the current Nilus pipeline shape.

### Batch, full-collection snapshot to Lakehouse

```yaml
name: nilus-mongodb-batch
version: v1alpha
type: nilus
description: MongoDB collection → DataOS Lakehouse snapshot
spec:
  type: batch
  compute: runnable-default
  source:
    address: dataos://mongodb-depot
    options:
      source_table: retail.customers
      max_table_nesting: "0"
  sink:
    address: dataos://analytics-lakehouse
    options:
      dest_table: retail.customers_raw
      incremental_strategy: replace
      aws_region: us-west-2
```

### Batch, incremental ingestion driven by `updated_at`

```yaml
spec:
  type: batch
  source:
    address: dataos://mongodb-depot
    options:
      source_table: retail.orders
      incremental_key: updated_at
      max_table_nesting: "0"
  sink:
    address: dataos://analytics-lakehouse
    options:
      dest_table: retail.orders_raw
      incremental_strategy: merge
```

### Batch, aggregation pipeline with projection

```yaml
spec:
  type: batch
  source:
    address: dataos://mongodb-depot
    options:
      source_table: 'analytics.events:[{"$match": {"event_type": "purchase"}}, {"$project": {"_id": 1, "user_id": 1, "amount": 1, "occurred_at": 1}}]'
      incremental_key: occurred_at
  sink:
    address: dataos://analytics-lakehouse
    options:
      dest_table: analytics.purchase_events
      incremental_strategy: append
```

## Behavior and capabilities

* **Compute model**: the Nilus runtime drives the cluster through the official `pymongo` driver; the cluster does the read.
* **Object model**: MongoDB databases and collections; one Nilus pipeline reads exactly one collection (or one aggregation pipeline output).
* **Pipeline mode**: `batch` only on this page; for row-level change capture see the [MongoDB (CDC)](/concepts/resources/nilus/cdc/cdc-sources/mongodb.md).
* **Aggregation-pipeline form**: when `source_table` carries a `:[pipeline]` suffix, the connector implicitly sets `max_table_nesting=1` on the resulting dataset because the pipeline output may already be partially flattened.
* **Document IDs**: Mongo's native `_id` is preserved as-is on the source side; whether it surfaces in the destination depends on the destination's behavior (e.g., the [MongoDB](/concepts/resources/nilus/destinations/databases/mongodb.md) writes `_id` directly when present).
* **Incremental key requirements**: the field should be indexed in MongoDB so the source can scan a bounded range efficiently. Without an index, the cluster falls back to a full collection scan with a server-side filter.

## Troubleshooting

| Symptom                                                                  | Likely cause                                                                                 | Resolution                                                                                                                                                          |
| ------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `ServerSelectionTimeoutError`                                            | Atlas IP allow-list rejects the runtime egress IP, or the SRV record could not be resolved.  | Add the runtime egress range to **Atlas Network Access**, and confirm DNS resolution for the SRV record from inside the runtime.                                    |
| `OperationFailure: Authentication failed`                                | Wrong username/password, wrong `authSource`, or the user is not provisioned on the database. | Verify credentials, add `authSource=admin` (or the correct auth DB) to the URI query, and confirm the user has `read` on the source database.                       |
| `Invalid MongoDB query format: …`                                        | `source_table` was supplied with a `:[…]` suffix that is not valid JSON / Extended JSON.     | Convert MongoDB shell syntax (`new Date(...)`, `ISODate(...)`, etc.) into Extended JSON v2 form, or use the helper utilities documented in the MongoDB driver docs. |
| `Query must be a JSON array representing a MongoDB aggregation pipeline` | The aggregation suffix decoded to a single document instead of a pipeline array.             | Wrap the stages in `[ ... ]`.                                                                                                                                       |
| `Incremental key '<field>' is not projected by the pipeline`             | The pipeline's final `$project` stage drops the `incremental_key` field.                     | Include the incremental key in the `$project` stage so the runtime can apply the watermark filter.                                                                  |
| Slow runs / high cluster CPU on `incremental_key` queries                | The incremental field is not indexed; the cluster does a full COLLSCAN per run.              | Build an index on the incremental field upstream: `db.<collection>.createIndex({ <field>: 1 })`.                                                                    |
| TLS handshake failures                                                   | Cluster requires TLS; URI did not opt in.                                                    | Append `?tls=true` (and, for self-signed dev clusters, `tlsAllowInvalidCertificates=true`).                                                                         |

## Related docs

* [MongoDB (CDC)](/concepts/resources/nilus/cdc/cdc-sources/mongodb.md): companion CDC connector for row-level change capture.
* [MongoDB](/concepts/resources/nilus/destinations/databases/mongodb.md): companion destination connector.
* [Optimize Sink Datasets](/concepts/resources/nilus/pipeline-optimization/optimize-sink-datasets.md): guidance on `incremental_strategy`, `max_table_nesting`, `partition_by`, and other dataset-shape settings.


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://v2.dataos.info/concepts/resources/nilus/batch/batch-sources/mongodb.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
