Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions managed/cmd/pmm-managed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ func main() { //nolint:gocognit,maintidx,cyclop
clickhouseAddrF := kingpin.Flag("clickhouse-addr", "Clickhouse database address").Default("127.0.0.1:9000").Envar("PMM_CLICKHOUSE_ADDR").String()
clickhouseUsernameF := kingpin.Flag("clickhouse-username", "Clickhouse database user").Default("default").Envar("PMM_CLICKHOUSE_USER").String()
clickhousePasswordF := kingpin.Flag("clickhouse-password", "Clickhouse database user password").Default("clickhouse").Envar("PMM_CLICKHOUSE_PASSWORD").String()

watchtowerHostF := kingpin.Flag("watchtower-host", "Watchtower host").Default("http://watchtower:8080").Envar("PMM_WATCHTOWER_HOST").URL()

// Nomad garbage collection flags
Expand Down Expand Up @@ -824,15 +823,18 @@ func main() { //nolint:gocognit,maintidx,cyclop
grafanadb.DSN.DB = "grafana"
grafanadb.DSN.Params = q.Encode()

chURI := url.URL{
Scheme: "tcp",
User: url.UserPassword(*clickhouseUsernameF, *clickhousePasswordF),
Host: *clickhouseAddrF,
Path: *clickHouseDatabaseF,
chParams, err := models.NewClickHouseParams(
*clickhouseAddrF,
*clickHouseDatabaseF,
*clickhouseUsernameF,
*clickhousePasswordF,
)
if err != nil {
l.Panicf("cannot load clickhouse params: %+v", err)
}

qanDB := ds.QanDBSelect
qanDB.DSN = chURI.String()
qanDB.DSN = chParams.URL().String()

ds.VM.Address = *victoriaMetricsURLF

Expand Down Expand Up @@ -882,7 +884,7 @@ func main() { //nolint:gocognit,maintidx,cyclop

cleaner := clean.New(db)
externalRules := vmalert.NewExternalRules()
vmdb, err := victoriametrics.NewVictoriaMetrics(*victoriaMetricsConfigF, db, vmParams, haService)
vmdb, err := victoriametrics.NewVictoriaMetrics(*victoriaMetricsConfigF, db, vmParams, chParams, haService)
if err != nil {
l.Panicf("VictoriaMetrics service problem: %+v", err)
}
Expand Down Expand Up @@ -1022,7 +1024,7 @@ func main() { //nolint:gocognit,maintidx,cyclop
versionCache := versioncache.New(db, versioner)

dumpService := dump.New(db, &dump.URLs{
ClickhouseURL: chURI.String(),
ClickhouseURL: chParams.URL().String(),
VMURL: *victoriaMetricsURLF,
})

Expand Down
2 changes: 1 addition & 1 deletion managed/models/agent_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (a *Agent) ExporterURL(q *reform.Querier) (string, error) {
username := pointer.GetString(a.Username)
password := pointer.GetString(a.Password)

host := "127.0.0.1"
host := localhost
if !a.ExporterOptions.PushMetrics {
node, err := FindNodeByID(q, *a.RunsOnNodeID)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions managed/models/clickhouse_params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) 2023 Percona LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package models

import (
"fmt"
"net"
"net/url"
"strconv"
)

// ClickHouseParams represents ClickHouse server params.
type ClickHouseParams struct {
url *url.URL
}

// ExternalClickHouse returns true if ClickHouse is configured externally.
func (p *ClickHouseParams) ExternalClickHouse() bool {
return !internalAddr(p.url.Hostname())
}

// URL returns the ClickHouse URL.
func (p *ClickHouseParams) URL() *url.URL {
u := *p.url
return &u
}

// NewClickHouseParams returns validated ClickHouse configuration params,
// or an error if any required field is missing or malformed.
func NewClickHouseParams(addr, dbName, dbUsername, dbPassword string) (*ClickHouseParams, error) {
if addr == "" {
return nil, fmt.Errorf("addr is required")
}
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("invalid addr %q: %w", addr, err)
}
if host == "" {
return nil, fmt.Errorf("invalid addr %q: empty host", addr)
}
if _, err := strconv.ParseUint(port, 10, 16); err != nil {
return nil, fmt.Errorf("invalid port in addr %q: %w", addr, err)
}
if dbName == "" {
return nil, fmt.Errorf("database name is required")
}
if dbUsername == "" {
return nil, fmt.Errorf("username is required")
}

return &ClickHouseParams{
url: &url.URL{
Scheme: "tcp",
User: url.UserPassword(dbUsername, dbPassword),
Host: addr,
Path: dbName,
},
}, nil
}
94 changes: 94 additions & 0 deletions managed/models/clickhouse_params_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) 2023 Percona LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package models

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewClickHouseParams(t *testing.T) {
t.Run("valid", func(t *testing.T) {
p, err := NewClickHouseParams("127.0.0.1:9000", "pmm", "default", "clickhouse")
require.NoError(t, err)
assert.Equal(t, "tcp://default:clickhouse@127.0.0.1:9000/pmm", p.URL().String())
})

t.Run("valid empty password", func(t *testing.T) {
_, err := NewClickHouseParams("127.0.0.1:9000", "pmm", "default", "")
require.NoError(t, err)
})

errCases := []struct {
name string
addr string
dbName string
dbUsername string
dbPassword string
wantErrSub string
}{
{"empty addr", "", "pmm", "default", "clickhouse", "addr is required"},
{"missing port", "127.0.0.1", "pmm", "default", "clickhouse", "invalid addr"},
{"empty host", ":9000", "pmm", "default", "clickhouse", "empty host"},
{"non numeric port", "localhost:abc", "pmm", "default", "clickhouse", "invalid port"},
{"port out of range", "localhost:99999", "pmm", "default", "clickhouse", "invalid port"},
{"empty db name", "127.0.0.1:9000", "", "default", "clickhouse", "database name is required"},
{"empty username", "127.0.0.1:9000", "pmm", "", "clickhouse", "username is required"},
}
for _, tc := range errCases {
t.Run(tc.name, func(t *testing.T) {
_, err := NewClickHouseParams(tc.addr, tc.dbName, tc.dbUsername, tc.dbPassword)
require.Error(t, err)
assert.ErrorContains(t, err, tc.wantErrSub)
})
}
}

func TestCHParamsExternalClickHouse(t *testing.T) {
cases := []struct {
name string
addr string
want bool
}{
{"loopback", "127.0.0.1:9000", false},
{"localhost", "localhost:9000", false},
{"external host", "ch-01.test.net:9000", true},
{"wildcard", "0.0.0.0:9000", true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
p, err := NewClickHouseParams(tc.addr, "pmm", "default", "clickhouse")
require.NoError(t, err)
assert.Equal(t, tc.want, p.ExternalClickHouse())
})
}
}

func TestCHParamsURL(t *testing.T) {
t.Run("simple", func(t *testing.T) {
p, err := NewClickHouseParams("127.0.0.1:9000", "pmm", "default", "clickhouse")
require.NoError(t, err)
assert.Equal(t, "tcp://default:clickhouse@127.0.0.1:9000/pmm", p.URL().String())
})

t.Run("password with special chars", func(t *testing.T) {
p, err := NewClickHouseParams("127.0.0.1:9000", "pmm", "default", "p@ss/word")
require.NoError(t, err)
assert.Equal(t, "tcp://default:p%40ss%2Fword@127.0.0.1:9000/pmm", p.URL().String())
})
}
10 changes: 10 additions & 0 deletions managed/models/common_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,13 @@ func (e *InvalidArgumentError) Error() string {
func NewInvalidArgumentError(format string, a ...interface{}) *InvalidArgumentError {
return &InvalidArgumentError{Details: fmt.Sprintf(format, a...)}
}

// localhost is the IPv4 loopback address used by PMM Server's
// co-located services.
const localhost = "127.0.0.1"

// internalAddr reports whether host refers to PMM's built-in,
// co-located services.
func internalAddr(host string) bool {
return host == localhost || host == "localhost"
}
4 changes: 2 additions & 2 deletions managed/models/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ func setupPMMServerHAAgents(q *reform.Querier, params SetupDBParams) error {

node, err := createNodeWithID(q, nodeID, GenericNodeType, &CreateNodeParams{
NodeName: params.HANodeID,
Address: "127.0.0.1",
Address: localhost,
CustomLabels: labels,
IsPMMServerNode: true,
})
Expand Down Expand Up @@ -1596,7 +1596,7 @@ func setupPMMServerAgents(q *reform.Querier, params SetupDBParams) error {
// create PMM Server Node and associated Agents
node, err := createNodeWithID(q, PMMServerNodeID, GenericNodeType, &CreateNodeParams{
NodeName: "pmm-server",
Address: "127.0.0.1",
Address: localhost,
IsPMMServerNode: true,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion managed/models/victoriametrics_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (vmp *VictoriaMetricsParams) loadVMAlertParams() error {

// ExternalVM returns true if VictoriaMetrics is configured to run externally.
func (vmp *VictoriaMetricsParams) ExternalVM() bool {
return vmp.url.Hostname() != "127.0.0.1"
return !internalAddr(vmp.url.Hostname())
}

// URL returns the base URL for VictoriaMetrics.
Expand Down
5 changes: 3 additions & 2 deletions managed/services/victoriametrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func AddScrapeConfigs(l *logrus.Entry, cfg *config.Config, q *reform.Querier, //
}
switch {
case pushMetrics:
paramsHost = "127.0.0.1"
paramsHost = localhost
case agent.PMMAgentID != nil:
pmmAgentNode = &models.Node{NodeID: pointer.GetString(pmmAgent.RunsOnNodeID)}
if err = q.Reload(pmmAgentNode); err != nil {
Expand Down Expand Up @@ -272,7 +272,8 @@ func addInternalServicesToScrape(s models.MetricsResolutions, svc *Service, pmmS
scrapeConfigForQANAPI2(s.MR, pmmServerNodeName),
}

if svc.params.ExternalVM() {
if svc.chParams.ExternalClickHouse() {
svc.l.Warnf("Skip internal ClickHouse scrape config, ClickHouse is configured to run externally.")
return cfg
}

Expand Down
10 changes: 5 additions & 5 deletions managed/services/victoriametrics/scrape_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func scrapeConfigForClickhouse(mr time.Duration, pmmServerNodeName string) *conf
MetricsPath: "/metrics",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.Group{{
Targets: []string{"127.0.0.1:9363"},
Targets: []string{localhost + ":9363"},
Labels: map[string]string{"instance": pmmServerNodeName},
}},
},
Expand All @@ -70,7 +70,7 @@ func scrapeConfigForGrafana(interval time.Duration, pmmServerNodeName string) *c
MetricsPath: "/graph/metrics",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.Group{{
Targets: []string{"127.0.0.1:3000"},
Targets: []string{localhost + ":3000"},
Labels: map[string]string{"instance": pmmServerNodeName},
}},
},
Expand All @@ -85,7 +85,7 @@ func scrapeConfigForPMMManaged(interval time.Duration, pmmServerNodeName string)
MetricsPath: "/debug/metrics",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.Group{{
Targets: []string{"127.0.0.1:7773"},
Targets: []string{localhost + ":7773"},
Labels: map[string]string{"instance": pmmServerNodeName},
}},
},
Expand All @@ -100,7 +100,7 @@ func scrapeConfigForQANAPI2(interval time.Duration, pmmServerNodeName string) *c
MetricsPath: "/debug/metrics",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.Group{{
Targets: []string{"127.0.0.1:9933"},
Targets: []string{localhost + ":9933"},
Labels: map[string]string{"instance": pmmServerNodeName},
}},
},
Expand All @@ -116,7 +116,7 @@ func scrapeConfigForNomadServer(resolution time.Duration, pmmServerNodeName stri
Scheme: "https",
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.Group{{
Targets: []string{"127.0.0.1:4646"},
Targets: []string{localhost + ":4646"},
Labels: map[string]string{"instance": pmmServerNodeName},
}},
},
Expand Down
19 changes: 16 additions & 3 deletions managed/services/victoriametrics/victoriametrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
victoriametricsDir = "/srv/victoriametrics"
victoriametricsDataDir = "/srv/victoriametrics/data"
dirPerm = os.FileMode(0o775)

localhost = "127.0.0.1"
)

var checkFailedRE = regexp.MustCompile(`(?s)cannot unmarshal data: (.+)`)
Expand All @@ -60,26 +62,37 @@ type Service struct {
baseURL *url.URL
client *http.Client

params *models.VictoriaMetricsParams
params *models.VictoriaMetricsParams
chParams *models.ClickHouseParams

l *logrus.Entry
reloadCh chan struct{}
haService haService
}

// NewVictoriaMetrics creates new VictoriaMetrics service.
func NewVictoriaMetrics(scrapeConfigPath string, db *reform.DB, params *models.VictoriaMetricsParams, haService haService) (*Service, error) {
func NewVictoriaMetrics(
scrapeConfigPath string,
db *reform.DB,
params *models.VictoriaMetricsParams,
chParams *models.ClickHouseParams,
haService haService,
) (*Service, error) {
u, err := url.Parse(params.URL())
if err != nil {
return nil, err
}
if chParams == nil {
Comment thread
maxkondr marked this conversation as resolved.
return nil, fmt.Errorf("ClickHouse params is required")
}

return &Service{
scrapeConfigPath: scrapeConfigPath,
db: db,
baseURL: u,
client: &http.Client{}, // TODO instrument with utils/irt; see vmalert package https://jira.percona.com/browse/PMM-7229
params: params,
chParams: chParams,
l: logrus.WithField("component", "victoriametrics"),
reloadCh: make(chan struct{}, 1),
haService: haService,
Expand Down Expand Up @@ -435,7 +448,7 @@ func scrapeConfigForVMAlert(interval time.Duration, pmmServerNodeName string) *c
ServiceDiscoveryConfig: config.ServiceDiscoveryConfig{
StaticConfigs: []*config.Group{
{
Targets: []string{"127.0.0.1:8880"},
Targets: []string{localhost + ":8880"},
Labels: map[string]string{"instance": pmmServerNodeName},
},
},
Expand Down
Loading
Loading