Connectors

MongoDB CDC Connector

A Go-based connector that replicates data from MongoDB to RawTree using Change Streams for CDC (Change Data Capture).

How it works

The MongoDB CDC connector to RawTree works in two phases: an initial snapshot phase followed by continuous change data capture (CDC). During the snapshot phase, the connector captures the current MongoDB cluster time and performs a full Find() scan of each collection. All existing documents are sent to RawTree as synthetic insert events, and the captured cluster time is stored as the starting point for CDC.

Once the snapshot is complete, the connector opens a MongoDB Change Stream from the saved cluster time to receive insert, update, replace, and delete events in real time. These events are batched and delivered to RawTree through its HTTP API, while resume tokens are periodically persisted to disk for fault tolerance. In the event of a crash or restart, the connector resumes processing from the last confirmed checkpoint. If the MongoDB oplog has rolled past the saved token, the connector automatically performs a new snapshot to recover safely.

Each MongoDB collection maps directly to a corresponding RawTree table. For example, ecommerce.orders becomes mongo_orders, while ecommerce.users maps to mongo_users.

View in github

Features

  • Full snapshot: Initial replication of existing collections
  • CDC streaming: Real-time change capture via MongoDB Change Streams
  • Multi-collection: Watch multiple collections with regex filtering
  • At-least-once delivery: Checkpoint-based recovery with resume tokens
  • Auto table creation: RawTree tables are created automatically
  • Backpressure: Configurable batch sizes and flush intervals
  • Observability: Prometheus metrics, health/readiness endpoints

The connector requires MongoDB 4.0+ replica set or sharded cluster (Change Streams require a replica set) and Go 1.23+ for building from source.

Quick Start

Clone the repository and install dependencies (Go 1.23+, mongoimport, mongosh, and direnv).

git clone https://github.com/rawtreedb/rawtree-mongo-connector.git
cd your-project-name

Docker Compose

# Edit docker-compose.yaml with your RawTree API key
docker compose up

Binary

make build
./bin/rawtree-mongo-connector --config config.example.yaml

Docker

make docker
docker run -v ./config.yaml:/etc/connector/config.yaml rawtree-mongo-connector:dev

Helm (Kubernetes)

helm install mongo-connector deploy/helm/rawtree-mongo-connector \
  --set config.mongodb.database=mydb \
  --set config.rawtree.endpoint=https://api.rawtree.com \
  --set secrets.mongodbUri="mongodb+srv://..." \
  --set secrets.rawtreeApiKey="rt_..."

Configuration

See config.example.yaml for all options. Key settings:

SettingEnv OverrideDescription
mongodb.uriRAWTREE_MONGO_MONGODB_URIMongoDB connection string
mongodb.databaseRAWTREE_MONGO_MONGODB_DATABASEDatabase to watch
rawtree.endpointRAWTREE_MONGO_RAWTREE_ENDPOINTRawTree API URL
rawtree.api_keyRAWTREE_MONGO_RAWTREE_API_KEYRawTree API key
rawtree.table_prefixRAWTREE_MONGO_RAWTREE_TABLE_PREFIXTable name prefix (default: mongo_)

Collection Filtering

mongodb:
  collections:
    # Explicit list
    include: ["orders", "users", "products"]
    # OR regex-based
    include_regex: "^(orders|users)"
    exclude_regex: "^system\\."

Event Format

Each row sent to RawTree includes CDC metadata:

{
  "_rt_op": "insert",
  "_rt_ts": "2024-01-15T10:30:00Z",
  "_rt_ns": "ecommerce.orders",
  "_rt_id": "507f1f77bcf86cd799439011",
  "_rt_doc": { ... }
}

Metrics

Prometheus metrics on :9090/metrics:

MetricDescription
connector_events_totalEvents processed (by collection, op)
connector_events_sent_totalEvents sent to RawTree (by table)
connector_batch_latency_secondsHTTP POST latency
connector_lag_secondsCDC lag (by collection)
connector_snapshot_progressSnapshot rows scanned
connector_errors_totalErrors (by type)

Health Endpoints

  • GET :9090/healthz — always 200 once started
  • GET :9090/readyz — 200 when streaming, 503 during init