Skip to content

web3tea/curio-sentinel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

curio-sentinel

A CLI tool and library for monitoring database changes in Curio clusters.

It builds on top of YugabyteDB's CDC feature and provides a simple way to monitor changes in the database.

Table of Contents

Standalone Program Usage

When used as a standalone program, curio-sentinel can be run directly from the command line without writing any additional code.

Installation

To install the standalone program, run the following command:

make build

Configuration

Create a config.toml configuration file with the following settings:

log_level = "debug"  # Optional: debug, info, warn, error

[capturer]
# Optional: customize slot and publication names
# slot_name = "curio_sentinel_slot"
# publication_name = "curio_sentinel_pub"
# drop_slot_on_stop = true
# drop_publication_on_stop = true

[capturer.database]
hosts = ["localhost"]
port = 5433
username = "yugabyte"
password = "yugabyte"
database = "your_database"

[sink]
type = "console"  # Currently supports "console" output

Running

# Run with default config path (config.toml)
./curio-sentinel run

# Run with custom config path
./curio-sentinel run -c your-config.toml

Once started, the program will monitor database changes according to your configuration, filter and transform events as specified, and output them to the configured sink (default: console).

Insert

insert

Update

update

Delete

delete

Library Usage

Basic Usage

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/web3tea/curio-sentinel/capturer"
)

func main() {
	// Create a capturer config
	config := capturer.Config{
		Database: capturer.DatabaseConfig{
			Hosts:    []string{"localhost"},
			Port:     5433,
			Username: "yugabyte",
			Password: "yugabyte",
			Database: "your_curio_database",
		},
		// Specify tables to monitor, leave empty to monitor all tables
		Tables: []string{"curio.harmony_task"},
	}

	// Create a capturer
	cap := capturer.NewYugabyteCapturer(config, nil)
	if err := cap.Start(); err != nil {
		log.Fatalf("Failed to start capturer: %v", err)
	}

	// Set up signal handling
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	// Run the event processing in a goroutine
	go func() {
		for event := range cap.Events() {
			fmt.Println("------------------------------------------------------")
			fmt.Printf("Event: %s on %s.%s at %s\n", event.Type, event.Schema, event.Table, event.Timestamp)

			// handle event
			switch event.Type {
			case capturer.OperationTypeInsert:
				printJSON("After", event.After)
			case capturer.OperationTypeDelete:
				printJSON("Before", event.Before)
			case capturer.OperationTypeUpdate:
				printJSON("Before", event.Before)
				printJSON("After", event.After)
			}
			
			if len(event.PrimaryKey) > 0 {
				printJSON("PrimaryKey", event.PrimaryKey)
			}
			
			// Acknowledge the event
			cap.ACK(context.Background(), event.LSN)
		}
	}()

	// Wait for termination signal
	<-sigCh
	fmt.Println("Stopping capturer...")

	// This will block until Stop completes
	// It may take a while to clean up the slot and publication in the database
	cap.Stop()
	fmt.Println("Capturer stopped, exiting")
}

func printJSON(label string, data any) {
	jsonData, err := json.MarshalIndent(data, "", "  ")
	if err != nil {
		fmt.Printf("Error marshaling %s to JSON: %v\n", label, err)
	} else {
		fmt.Printf("%s: %s\n", label, string(jsonData))
	}
}

Terminal Output Example

When running the above code and performing operations in the database, the terminal will display output similar to:

------------------------------------------------------
Event: INSERT on curio.harmony_task at 2025-01-27 23:30:45.123456
After: {
  "added_by": 1,
  "id": 48821,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": null,
  "posted_time": "2025-01-27T23:30:10.36213Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:30:10.36213Z"
}
PrimaryKey: {
  "id": 48821
}
------------------------------------------------------
Event: UPDATE on curio.harmony_task at 2025-01-27 23:31:12.654321
Before: {
  "added_by": 1,
  "id": 48821,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": null,
  "posted_time": "2025-01-27T23:30:10.36213Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:30:10.36213Z"
}
After: {
  "added_by": 1,
  "id": 48821,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": 1,
  "posted_time": "2025-01-27T23:30:10.36213Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:30:10.36213Z"
}
PrimaryKey: {
  "id": 48821
}
------------------------------------------------------
Event: DELETE on curio.harmony_task at 2025-01-27 23:33:21.987654
Before: {
  "added_by": 1,
  "id": 48821,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": 1,
  "posted_time": "2025-01-27T23:30:10.36213Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:30:10.36213Z"
}
PrimaryKey: {
  "id": 48821
}

Replica Identity and Output Differences

PostgreSQL (and YugabyteDB) use the Replica Identity setting to determine what data is recorded in the WAL logs during UPDATE and DELETE operations. This directly affects the content and completeness of data captured by CDC tools like curio-sentinel.

For more details, refer to the PostgreSQL Replica Identity documentation and YugabyteDB Change Data Capture documentation.

Replica Identity Types

YugabyteDB supports four Replica Identity modes:

  1. CHANGE (default)
  2. DEFAULT
  3. FULL
  4. NOTHING

The replica identity INDEX is not supported in YugabyteDB.

Output Comparison with Different Replica Identity Settings

FULL Mode (Complete Row Identification)

In FULL mode, UPDATE and DELETE events will include all column values for the row.

-- Set table to FULL mode, only works before slot creation
ALTER TABLE curio.harmony_task REPLICA IDENTITY FULL;

UPDATE operation output example:

Event: UPDATE on curio.harmony_task at 2025-01-27 23:45:41.781039 +0800 CST
Before: {
  "added_by": 1,
  "id": 50337,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": null,
  "posted_time": "2025-01-27T23:45:41.907074Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:45:41.907074Z"
}
After: {
  "added_by": 1,
  "id": 50337,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": 1,
  "posted_time": "2025-01-27T23:45:41.907074Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:45:41.907074Z"
}
PrimaryKey: {
  "id": 50337
}

DELETE operation output example:

Event: DELETE on curio.harmony_task at 2025-01-27 23:45:55.803845 +0800 CST
Before: {
  "added_by": 1,
  "id": 50337,
  "initiated_by": null,
  "name": "WinPost",
  "owner_id": 1,
  "posted_time": "2025-01-27T23:45:41.907074Z",
  "previous_task": null,
  "retries": 0,
  "update_time": "2025-01-27T23:45:41.907074Z"
}
PrimaryKey: {
  "id": 50337
}

YugabyteDB Logical Replication Library for Go

Examples: yugabyte.go

Check out the yblogrepl for more information.

About

A CLI tool and library for monitoring database changes in Curio clusters.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages