MongoDB CDC Connector
A Go-based connector that replicates data from MongoDB to RawTree using Change Streams for CDC (Change Data Capture).
How it works
The MongoDB CDC connector to RawTree works in two phases: an initial snapshot phase followed by continuous change data capture (CDC). During the snapshot phase, the connector captures the current MongoDB cluster time and performs a full Find() scan of each collection. All existing documents are sent to RawTree as synthetic insert events, and the captured cluster time is stored as the starting point for CDC.
Once the snapshot is complete, the connector opens a MongoDB Change Stream from the saved cluster time to receive insert, update, replace, and delete events in real time. These events are batched and delivered to RawTree through its HTTP API, while resume tokens are periodically persisted to disk for fault tolerance. In the event of a crash or restart, the connector resumes processing from the last confirmed checkpoint. If the MongoDB oplog has rolled past the saved token, the connector automatically performs a new snapshot to recover safely.
Each MongoDB collection maps directly to a corresponding RawTree table. For example, ecommerce.orders becomes mongo_orders, while ecommerce.users maps to mongo_users.
Features
- Full snapshot: Initial replication of existing collections
- CDC streaming: Real-time change capture via MongoDB Change Streams
- Multi-collection: Watch multiple collections with regex filtering
- At-least-once delivery: Checkpoint-based recovery with resume tokens
- Auto table creation: RawTree tables are created automatically
- Backpressure: Configurable batch sizes and flush intervals
- Observability: Prometheus metrics, health/readiness endpoints
The connector requires MongoDB 4.0+ replica set or sharded cluster (Change Streams require a replica set) and Go 1.23+ for building from source.
Quick Start
Clone the repository and install dependencies (Go 1.23+, mongoimport, mongosh, and direnv).
git clone https://github.com/rawtreedb/rawtree-mongo-connector.git
cd your-project-nameDocker Compose
# Edit docker-compose.yaml with your RawTree API key
docker compose upBinary
make build
./bin/rawtree-mongo-connector --config config.example.yamlDocker
make docker
docker run -v ./config.yaml:/etc/connector/config.yaml rawtree-mongo-connector:devHelm (Kubernetes)
helm install mongo-connector deploy/helm/rawtree-mongo-connector \
--set config.mongodb.database=mydb \
--set config.rawtree.endpoint=https://api.rawtree.com \
--set secrets.mongodbUri="mongodb+srv://..." \
--set secrets.rawtreeApiKey="rt_..."Configuration
See config.example.yaml for all options. Key settings:
| Setting | Env Override | Description |
|---|---|---|
mongodb.uri | RAWTREE_MONGO_MONGODB_URI | MongoDB connection string |
mongodb.database | RAWTREE_MONGO_MONGODB_DATABASE | Database to watch |
rawtree.endpoint | RAWTREE_MONGO_RAWTREE_ENDPOINT | RawTree API URL |
rawtree.api_key | RAWTREE_MONGO_RAWTREE_API_KEY | RawTree API key |
rawtree.table_prefix | RAWTREE_MONGO_RAWTREE_TABLE_PREFIX | Table name prefix (default: mongo_) |
Collection Filtering
mongodb:
collections:
# Explicit list
include: ["orders", "users", "products"]
# OR regex-based
include_regex: "^(orders|users)"
exclude_regex: "^system\\."Event Format
Each row sent to RawTree includes CDC metadata:
{
"_rt_op": "insert",
"_rt_ts": "2024-01-15T10:30:00Z",
"_rt_ns": "ecommerce.orders",
"_rt_id": "507f1f77bcf86cd799439011",
"_rt_doc": { ... }
}Metrics
Prometheus metrics on :9090/metrics:
| Metric | Description |
|---|---|
connector_events_total | Events processed (by collection, op) |
connector_events_sent_total | Events sent to RawTree (by table) |
connector_batch_latency_seconds | HTTP POST latency |
connector_lag_seconds | CDC lag (by collection) |
connector_snapshot_progress | Snapshot rows scanned |
connector_errors_total | Errors (by type) |
Health Endpoints
GET :9090/healthz— always 200 once startedGET :9090/readyz— 200 when streaming, 503 during init