Skip to content
Merged
11 changes: 3 additions & 8 deletions cmd/pgwatch/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,13 @@
// all monitored DBs when missing. Main
// usage - pg_stat_statements
// [$PW_TRY_CREATE_LISTED_EXTS_IF_MISSING]
//
// --create-helpers Create helper database objects from
// metric definitions
// [$PW_CREATE_HELPERS]
// Metrics:
//
// -m, --metrics= File or folder of YAML files with
// metrics definitions [$PW_METRICS]
// --create-helpers Create helper database objects from
// metric definitions
// [$PW_CREATE_HELPERS]
// --direct-os-stats Extract OS related psutil statistics
// not via PL/Python wrappers but
// directly on host
// [$PW_DIRECT_OS_STATS]
// --instance-level-cache-max-seconds= Max allowed staleness for instance
// level metric data shared between DBs
// of an instance. Affects 'continuous'
Expand Down
4 changes: 4 additions & 0 deletions cmd/pgwatch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func main() {
return
}

if opts.Metrics.DirectOSStats {
logger.Warning("--direct-os-stats flag is deprecated, direct OS access is now applied automatically for relevant metrics if on same host.")
}

reaper := reaper.NewReaper(mainCtx, opts)

if _, err = webserver.Init(mainCtx, opts.WebUI, webui.WebUIFs, opts.MetricsReaderWriter,
Expand Down
24 changes: 24 additions & 0 deletions cmd/pgwatch/main_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ metrics:
test_metric:
sqls:
11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 42 as test_metric
cpu_load:
sqls:
11: select 'metric fetched via PostgreSQL not OS'
`), 0644))

pg, err := initTestContainer()
Expand All @@ -66,6 +69,7 @@ metrics:
is_enabled: true
custom_metrics:
test_metric: 60
cpu_load: 60
`), 0644))

// Mock Exit to capture exit code
Expand Down Expand Up @@ -127,4 +131,24 @@ metrics:
<-mainCtx.Done() // Wait for main to finish
assert.Equal(t, cmdopts.ExitCodeWebUIError, gotExit, "port should be busy and fail to bind")
})

t.Run("non-direct os stats", func(t *testing.T) {
os.Remove(jsonSink) // truncate output file
os.Args = []string{
"pgwatch",
"--sources", sourcesYaml,
"--metrics", metricsYaml,
"--sink", "jsonfile://" + jsonSink,
"--web-disable",
}

go main()
<-time.After(5 * time.Second) // Wait for main to fetch metric
cancel()
<-mainCtx.Done() // Wait for main to finish

datab, err := os.ReadFile(jsonSink)
assert.NoError(t, err)
assert.Contains(t, string(datab), "metric fetched via PostgreSQL not OS")
})
}
5 changes: 0 additions & 5 deletions docs/reference/cli_env.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ It reads the configuration from the specified sources and metrics, then begins c
Postgres URI, YAML file or folder of YAML files with metrics definitions.
ENV: `$PW_METRICS`

- `--direct-os-stats`

Extract OS related psutil statistics not via PL/Python wrappers but directly on host.
ENV: `$PW_DIRECT_OS_STATS`

- `--instance-level-cache-max-seconds=`

Max allowed staleness for instance level metric data shared between DBs of an instance. Set to 0 to disable (default: 30).
Expand Down
14 changes: 7 additions & 7 deletions docs/tutorial/preparing_databases.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,21 @@ pgwatch metric print-init psutil_cpu psutil_mem psutil_disk psutil_disk_io_total
based on the Linux distro / Kernel version used, so small
adjustments might be needed there (e.g. to remove a non-existent
column). Minimum usable Kernel version required is 3.3.
- When running the gatherer locally one can enable the `--direct-os-stats`
parameter to signal that we can fetch the data for the default `psutil*` metrics
directly from OS counters. If direct OS fetching fails though, the
fallback is still to try via PL/Python wrappers.
- When pgwatch runs on the same host as a monitored source,
it auto-detects this and tries to fetch the default `psutil*`
metrics data directly from OS counters. If this direct OS fetch fails,
it falls back to using PL/Python wrappers.
- In rare cases when some "helpers" have been installed, and when
doing a binary PostgreSQL upgrade at some later point in time via
`pg_upgrade`, this could result in error messages
thrown. Then just drop those failing helpers on the "to be
upgraded" cluster and re-create them after the upgrade process.

!!! Info
If despite all the warnings you still want to run the pgwatch
with a sufficient user account (e.g. a superuser) you can also
If, despite all the warnings, you still want to run pgwatch
with a sufficient user account (e.g., a superuser), you can
use the `--create-helpers` parameter to automatically create all
needed helper functions in the monitored databases.
needed helper functions in the monitored databases on startup.

## Different source types explained

Expand Down
9 changes: 6 additions & 3 deletions internal/db/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ func MarshallParamToJSONB(v any) any {
func IsClientOnSameHost(conn PgxIface) (bool, error) {
ctx := context.Background()

// Step 1: Check connection type using SQL
// Option 1: Check if connection type is unix socket
var isUnixSocket bool
err := conn.QueryRow(ctx, "SELECT COALESCE(inet_client_addr(), inet_server_addr()) IS NULL").Scan(&isUnixSocket)
if err != nil || isUnixSocket {
return isUnixSocket, err
}

// Step 2: Retrieve unique cluster identifier
// Option 2: Compare system identifier from file system vs cluster identifier

// Step 1: Retrieve unique cluster identifier
var dataDirectory string
if err := conn.QueryRow(ctx, "SHOW data_directory").Scan(&dataDirectory); err != nil {
return false, err
Expand All @@ -89,7 +91,7 @@ func IsClientOnSameHost(conn PgxIface) (bool, error) {
return false, err
}

// Step 3: Compare system identifier from file system
// Step 2: Retrieve system identifier from file system
pgControlFile := filepath.Join(dataDirectory, "global", "pg_control")
file, err := os.Open(pgControlFile)
if err != nil {
Expand All @@ -102,5 +104,6 @@ func IsClientOnSameHost(conn PgxIface) (bool, error) {
return false, err
}

// Compare file system identifier and cluster identifier
return fileSystemIdentifier == systemIdentifier, nil
}
2 changes: 1 addition & 1 deletion internal/metrics/cmdopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// CmdOpts specifies metric command-line options
type CmdOpts struct {
Metrics string `short:"m" long:"metrics" mapstructure:"metrics" description:"Postgres URI or path to YAML file with metrics definitions" env:"PW_METRICS"`
DirectOSStats bool `long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW_DIRECT_OS_STATS"`
DirectOSStats bool `hidden:"true" long:"direct-os-stats" mapstructure:"direct-os-stats" description:"Extract OS related psutil statistics not via PL/Python wrappers but directly on host" env:"PW_DIRECT_OS_STATS"`
InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" mapstructure:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Set to 0 to disable" env:"PW_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"`
EmergencyPauseTriggerfile string `long:"emergency-pause-triggerfile" mapstructure:"emergency-pause-triggerfile" description:"When the file exists no metrics will be temporarily fetched" env:"PW_EMERGENCY_PAUSE_TRIGGERFILE" default:"/tmp/pgwatch-emergency-pause"`
}
Expand Down
4 changes: 2 additions & 2 deletions internal/reaper/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const (

var directlyFetchableOSMetrics = []string{metricPsutilCPU, metricPsutilDisk, metricPsutilDiskIoTotal, metricPsutilMem, metricCPULoad}

func IsDirectlyFetchableMetric(metric string) bool {
return slices.Contains(directlyFetchableOSMetrics, metric)
func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool {
return slices.Contains(directlyFetchableOSMetrics, metric) && md.IsClientOnSameHost()
}

func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error) {
Expand Down
7 changes: 5 additions & 2 deletions internal/reaper/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ var (
func TestReaper_FetchStatsDirectlyFromOS(t *testing.T) {
a := assert.New(t)
r := &Reaper{}
conn, _ := pgxmock.NewPool()
conn, _ := pgxmock.NewPool(pgxmock.QueryMatcherOption(pgxmock.QueryMatcherEqual))
expq := conn.ExpectQuery("SELECT COALESCE(inet_client_addr(), inet_server_addr()) IS NULL")
expq.Times(uint(len(directlyFetchableOSMetrics)))
md := &sources.SourceConn{Conn: conn}
for _, m := range directlyFetchableOSMetrics {
a.True(IsDirectlyFetchableMetric(m), "Expected %s to be directly fetchable", m)
expq.WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true))
a.True(IsDirectlyFetchableMetric(md, m), "Expected %s to be directly fetchable", m)
a.NotPanics(func() {
_, _ = r.FetchStatsDirectlyFromOS(context.Background(), md, m)
})
Expand Down
9 changes: 4 additions & 5 deletions internal/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,13 @@ func (r *Reaper) reapMetricMeasurements(ctx context.Context, md *sources.SourceC

var metricStoreMessages *metrics.MeasurementEnvelope

// 1st try local overrides for some metrics if operating in push mode
if r.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName)
if err != nil {
t1 := time.Now()
// 1st try local overrides for system metrics if operating on the same host
if IsDirectlyFetchableMetric(md, metricName) {
if metricStoreMessages, err = r.FetchStatsDirectlyFromOS(ctx, md, metricName); err != nil {
l.WithError(err).Errorf("Could not read metric directly from OS")
}
}
t1 := time.Now()
if metricStoreMessages == nil {
metricStoreMessages, err = r.FetchMetric(ctx, md, metricName)
}
Expand Down
6 changes: 6 additions & 0 deletions internal/sources/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func (md *SourceConn) GetMetricInterval(name string) float64 {
return md.Metrics[name]
}

// IsClientOnSameHost checks if the pgwatch client is running on the same host as the PostgreSQL server
func (md *SourceConn) IsClientOnSameHost() bool {
ok, err := db.IsClientOnSameHost(md.Conn)
return ok && err == nil
}

// SetDatabaseName sets the database name in the connection config for resolved databases
func (md *SourceConn) SetDatabaseName(name string) {
if err := md.ParseConfig(); err != nil {
Expand Down
Loading