Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 0 additions & 62 deletions .github/workflows/flow_schema_migrations.yml

This file was deleted.

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file.
- Tools
- Add `IsRetryableFunc` field to `RetryOptions` for configurable retry criteria in the Solana JSON-RPC client; add `"rate limited"` string match and RPC code `-32429` to the default implementation
- Telemetry
- Add shared `telemetry/migrations` package with goose-based ClickHouse schema migrations for all telemetry services; add `CLICKHOUSE_RUN_MIGRATIONS` env var to flow-enricher and gnmi-writer for on-startup schema migration ([#3460](https://github.com/malbeclabs/doublezero/pull/3460))
- Add optional TLS support to state-ingest server via `--tls-cert-file` and `--tls-key-file` flags; when set, the server listens on both HTTP (`:8080`) and HTTPS (`:8443`) simultaneously
- Remove `--additional-child-probes` CLI flag from telemetry-agent; child geoprobe discovery now relies entirely on the onchain Geolocation program
- Add BGP status submitter: on each tick, reads BGP socket state from the device namespace, maps each activated user to their tunnel peer IP, and submits `SetUserBGPStatus` onchain; supports a configurable down grace period and periodic keepalive refresh; enabled via `--bgp-status-enable` with `--bgp-status-interval`, `--bgp-status-refresh-interval`, and `--bgp-status-down-grace-period` flags
Expand Down
16 changes: 16 additions & 0 deletions telemetry/flow-enricher/cmd/flow-enricher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/malbeclabs/doublezero/config"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
enricher "github.com/malbeclabs/doublezero/telemetry/flow-enricher/internal/flow-enricher"
"github.com/malbeclabs/doublezero/telemetry/migrations"
"github.com/malbeclabs/doublezero/tools/solana/pkg/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -59,6 +60,21 @@ func main() {
if *stdoutOutput {
chWriter = enricher.NewStdoutWriter()
} else {
if os.Getenv("CLICKHOUSE_RUN_MIGRATIONS") == "true" {
if err := migrations.RunMigrations(
os.Getenv("CLICKHOUSE_ADDR"),
getEnvOrDefault("CLICKHOUSE_DB", "default"),
os.Getenv("CLICKHOUSE_USER"),
os.Getenv("CLICKHOUSE_PASS"),
os.Getenv("CLICKHOUSE_TLS_DISABLED") != "true",
logger,
); err != nil {
logger.Error("error running clickhouse migrations", "error", err)
os.Exit(1)
}
logger.Info("clickhouse migrations applied")
}

chOpts := []enricher.ClickhouseOption{}
if os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" {
chOpts = append(chOpts, enricher.WithTLSDisabled(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
"github.com/malbeclabs/doublezero/telemetry/migrations"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

Expand All @@ -34,7 +35,7 @@ var (
chUser = "enricher"
chPassword = "clickhouse"
chDbname = "default"
chTable = "flows_integration"
chTable = "flows"

rpBroker string
rpUser = "enricher"
Expand All @@ -53,13 +54,35 @@ func setupClickhouseContainer(ctx context.Context) error {
clickhouse.WithUsername(chUser),
clickhouse.WithPassword(chPassword),
clickhouse.WithDatabase(chDbname),
clickhouse.WithInitScripts(
filepath.Join("fixtures", "create_table_device_ifindex.sql"),
filepath.Join("fixtures", "insert_device_ifindex.sql"),
filepath.Join("fixtures", "create_table_flows.sql"),
),
)
return err
if err != nil {
return err
}

chConn, err := clickhouseCtr.ConnectionHost(ctx)
if err != nil {
return err
}

if err = migrations.RunMigrations(chConn, chDbname, chUser, chPassword, false, logger); err != nil {
return fmt.Errorf("error running clickhouse migrations: %w", err)
}

db, err := migrations.NewDB(chConn, chDbname, chUser, chPassword, false)
if err != nil {
return err
}
defer db.Close()

insertSQL, err := os.ReadFile(filepath.Join("fixtures", "insert_device_ifindex.sql"))
if err != nil {
return fmt.Errorf("error reading insert_device_ifindex.sql: %w", err)
}
if _, err = db.ExecContext(ctx, string(insertSQL)); err != nil {
return fmt.Errorf("error seeding device_ifindex: %w", err)
}

return nil
}

func setupRedpandaContainer(ctx context.Context) error {
Expand Down Expand Up @@ -192,7 +215,7 @@ func TestFlowEnrichment(t *testing.T) {
WithClickhouseDB(chDbname),
WithClickhouseUser(chUser),
WithClickhousePassword(chPassword),
WithClickhouseTable("flows_integration"),
WithClickhouseTable(chTable),
WithTLSDisabled(true),
WithClickhouseLogger(logger),
WithClickhouseMetrics(NewClickhouseMetrics(reg)),
Expand Down Expand Up @@ -297,7 +320,7 @@ func TestFlowEnrichment(t *testing.T) {
payload := readPcap(t, "./fixtures/sflow_ingress_user_traffic.pcap")

f := &flow.FlowSample{
ReceiveTimestamp: &timestamppb.Timestamp{Seconds: 1625243456, Nanos: 0},
ReceiveTimestamp: timestamppb.Now(),
FlowPayload: payload,
}

Expand Down

This file was deleted.

33 changes: 0 additions & 33 deletions telemetry/gnmi-writer/clickhouse/bgp_neighbors.sql

This file was deleted.

24 changes: 0 additions & 24 deletions telemetry/gnmi-writer/clickhouse/interface_ifindex.sql

This file was deleted.

37 changes: 0 additions & 37 deletions telemetry/gnmi-writer/clickhouse/interface_state.sql

This file was deleted.

33 changes: 0 additions & 33 deletions telemetry/gnmi-writer/clickhouse/isis_adjacencies.sql

This file was deleted.

Loading
Loading