From d8390d967e3498b67e08bbcc2cb0a533adcdd489 Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Wed, 4 Feb 2026 18:55:04 -0500 Subject: [PATCH] handle event socket broken pipe with automatic reconnection When cloud-event-proxy restarts, linuxptp-daemon's Unix socket connection to /cloud-native/events.sock breaks, causing persistent "broken pipe" errors and silent loss of PTP events. Add robust reconnection logic with exponential backoff that automatically re-establishes the event socket connection when a broken pipe is detected: - Introduce ReconnectWithBackoff utility in pkg/utils with configurable retry attempts, exponential backoff, and context-based cancellation for clean shutdown responsiveness. - Move net.Conn ownership into EventHandler with thread-safe getConn/setConn accessors that automatically close replaced connections, preventing resource leaks. - Add writeLogToSocket helper that encapsulates write-reconnect-retry logic for a single log line, replacing ad-hoc error handling at each write site. - Add dial timeouts (DialContext/DialTimeout) and write deadlines (SetWriteDeadline) to all socket operations to prevent indefinite blocking if the listener is unresponsive or the socket buffer is full. - Separate stdout printing from socket writing in ProcessEvents so all logs are always printed locally regardless of socket state. - Implement channel-based broken pipe signaling (brokenPipeCh) so background goroutines (clock class ticker, TBC announce) can notify the main ProcessEvents loop to reconnect without blocking. - Serialize concurrent reconnection attempts via reconnectMu to prevent multiple goroutines from opening duplicate connections. - Fix multiple data races: protect clkSyncState map and clockClass/ clockAccuracy fields with snapshot-under-lock patterns, resolve deadlock in updateBCState/announceClockClass by separating lock-holding from I/O operations, and guard LeadingClockData access in TBC goroutines. - Refactor EmitClockSyncLogs, EmitPortRoleLogs, EmitClockClass, and EmitProcessStatusLogs to use the EventHandler's managed connection with built-in reconnection support, including reconnect-on-nil-conn for reliable log emission after event proxy restarts. - Add IsBrokenPipe helper to detect EPIPE, ECONNRESET, ECONNREFUSED, and ENOTCONN errors including those wrapped in net.OpError. - Add comprehensive unit tests for reconnection backoff, broken pipe detection, connection management, and socket write/reconnect behavior. Signed-off-by: Jack Ding --- cmd/main.go | 2 +- pkg/daemon/daemon.go | 32 +++ pkg/daemon/ready.go | 28 +- pkg/event/event.go | 401 ++++++++++++++++++++++++---- pkg/event/event_socket_test.go | 465 +++++++++++++++++++++++++++++++++ pkg/event/event_tbc.go | 124 +++++---- pkg/utils/clock_class_log.go | 66 +++++ pkg/utils/reconnect.go | 82 ++++++ pkg/utils/reconnect_test.go | 168 ++++++++++++ 9 files changed, 1268 insertions(+), 100 deletions(-) create mode 100644 pkg/event/event_socket_test.go create mode 100644 pkg/utils/clock_class_log.go create mode 100644 pkg/utils/reconnect.go create mode 100644 pkg/utils/reconnect_test.go diff --git a/cmd/main.go b/cmd/main.go index e845630b..69584977 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -159,7 +159,7 @@ func main() { daemon.StartMetricsServer("0.0.0.0:9091") } - daemon.StartReadyServer("0.0.0.0:8081", tracker) + daemon.StartReadyServer("0.0.0.0:8081", tracker, stdoutToSocket) // Wait for one ticker interval before loading the profile // This allows linuxptp-daemon connection to the cloud-event-proxy container to diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 7a17adb1..75bcc8ef 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -85,6 +85,17 @@ var ptpTmpFiles = []string{ pmcSocketName, } +const socketDialTimeout = 5 * time.Second + +func dialSocket() (net.Conn, error) { + c, err := net.DialTimeout("unix", eventSocket, socketDialTimeout) + if err != nil { + glog.Errorf("error trying to connect to event socket") + time.Sleep(connectionRetryInterval) + } + return c, err +} + // ProcessManager manages a set of ptpProcess // which could be ptp4l, phc2sys or timemaster. // Processes in ProcessManager will be started @@ -171,6 +182,27 @@ func (p *ProcessManager) UpdateSynceConfig(config *synce.Relations) { } +// EmitProcessStatusLogs emits process status logs using the EventHandler's +// managed connection with reconnection support. +func (p *ProcessManager) EmitProcessStatusLogs() { + for _, proc := range p.process { + status := PtpProcessUp + if proc.Stopped() { + status = PtpProcessDown + } + p.ptpEventHandler.EmitProcessStatusLog(proc.name, proc.configName, status) + } +} + +// EmitClockClassLogs re-emits clock class via the EventHandler's managed connection. +func (p *ProcessManager) EmitClockClassLogs() { + for _, proc := range p.process { + if proc.name == ptp4lProcessName { + p.ptpEventHandler.EmitClockClass(proc.configName) + } + } +} + type tBCProcessAttributes struct { controlledPortsConfigFile string // Time receiver interface name for T-BC clock monitoring diff --git a/pkg/daemon/ready.go b/pkg/daemon/ready.go index 329d25c9..7038c7ba 100644 --- a/pkg/daemon/ready.go +++ b/pkg/daemon/ready.go @@ -76,10 +76,36 @@ func (h readyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func StartReadyServer(bindAddress string, tracker *ReadyTracker) { +type metricHandler struct { + tracker *ReadyTracker +} + +func (h metricHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + if isReady, _ := h.tracker.Ready(); !isReady { + w.WriteHeader(http.StatusNoContent) + return + } + w.WriteHeader(http.StatusOK) + + go func() { + eventHandler := h.tracker.processManager.ptpEventHandler + eventHandler.EmitClockSyncLogs() + eventHandler.EmitPortRoleLogs() + + processManager := h.tracker.processManager + go processManager.EmitProcessStatusLogs() + go processManager.EmitClockClassLogs() + }() +} + +// StartReadyServer ... +func StartReadyServer(bindAddress string, tracker *ReadyTracker, serveInitMetrics bool) { glog.Info("Starting Ready Server") mux := http.NewServeMux() mux.Handle("/ready", readyHandler{tracker: tracker}) + if serveInitMetrics { + mux.Handle("/emit-logs", metricHandler{tracker: tracker}) + } go utilwait.Until(func() { err := http.ListenAndServe(bindAddress, mux) if err != nil { diff --git a/pkg/event/event.go b/pkg/event/event.go index f8e8bd34..770287f8 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -1,6 +1,7 @@ package event import ( + "context" "fmt" "net" "sort" @@ -145,7 +146,12 @@ const ( PTP_NOTSET PTPState = "-2" ) -const connectionRetryInterval = 1 * time.Second +const ( + // socketDialTimeout is the maximum time to wait for a single dial attempt to the event socket. + socketDialTimeout = 5 * time.Second + // socketWriteTimeout is the maximum time to wait for a write to the event socket. + socketWriteTimeout = 5 * time.Second +) type clockSyncState struct { state PTPState @@ -166,6 +172,10 @@ type EventHandler struct { stdoutToSocket bool processChannel <-chan EventChannel closeCh chan bool + brokenPipeCh chan struct{} // signals broken pipe from background goroutines to trigger reconnection + conn net.Conn // event socket connection, guarded by connMu + connMu sync.Mutex // separate mutex for conn to avoid deadlocks with embedded sync.Mutex + reconnectMu sync.Mutex // serializes reconnection attempts to prevent leaked connections data map[string][]*Data offsetMetric *prometheus.GaugeVec clockMetric *prometheus.GaugeVec @@ -177,6 +187,27 @@ type EventHandler struct { frequencyTraceable bool // will be tru if synce is traceable ReduceLog bool // reduce logs for every announce LeadingClockData *LeadingClockParams + portRole map[string]map[string]*PortRoleEvent +} + +// getConn returns the current event socket connection under lock. +func (e *EventHandler) getConn() net.Conn { + e.connMu.Lock() + defer e.connMu.Unlock() + return e.conn +} + +// setConn replaces the current event socket connection under lock, closing the previous one if it exists. +func (e *EventHandler) setConn(c net.Conn) { + e.connMu.Lock() + oldConn := e.conn + e.conn = c + e.connMu.Unlock() + if oldConn != nil && oldConn != c { + if err := oldConn.Close(); err != nil { + glog.Warningf("failed to close old event handler connection: %v", err) + } + } } // EventChannel .. event channel to subscriber to events @@ -213,6 +244,7 @@ func Init(nodeName string, stdOutToSocket bool, socketName string, processChanne stdoutSocket: socketName, stdoutToSocket: stdOutToSocket, closeCh: closeCh, + brokenPipeCh: make(chan struct{}, 1), processChannel: processChannel, data: map[string][]*Data{}, clockMetric: clockMetric, @@ -227,6 +259,7 @@ func Init(nodeName string, stdOutToSocket bool, socketName string, processChanne downstreamTimeProperties: &protocol.TimePropertiesDS{}, downstreamParentDataSet: &protocol.ParentDataSet{}, }, + portRole: map[string]map[string]*PortRoleEvent{}, } StateRegisterer = NewStateNotifier() @@ -563,42 +596,153 @@ func (e *EventHandler) hasMetric(name string) (*prometheus.GaugeVec, bool) { return nil, false } +// AnnounceClockClass announces clock class changes to the event handler and writes to the connection. +// Broken pipe errors are handled internally via signalBrokenPipe. +func (e *EventHandler) AnnounceClockClass(clockClass fbprotocol.ClockClass, clockAcc fbprotocol.ClockAccuracy, cfgName string, clockType ClockType) { + e.announceClockClass(clockClass, clockAcc, cfgName) + select { + case clockClassRequestCh <- ClockClassRequest{ + cfgName: cfgName, + clockClass: clockClass, + clockType: clockType, + clockAccuracy: clockAcc, + }: + default: + glog.Warning("clock class request busy updating previous request, will try on next event") + } +} + +func (e *EventHandler) announceClockClass(clockClass fbprotocol.ClockClass, clockAcc fbprotocol.ClockAccuracy, cfgName string) { + e.Lock() + e.setClockClassLocked(clockClass, clockAcc) + e.Unlock() + + e.emitClockClass(clockClass, cfgName) +} + +// setClockClassLocked updates the clock class and accuracy fields. +// Caller must hold e.Lock(). +func (e *EventHandler) setClockClassLocked(clockClass fbprotocol.ClockClass, clockAcc fbprotocol.ClockAccuracy) { + e.clockClass = clockClass + e.clockAccuracy = clockAcc +} + +// emitClockClass writes the clock class to the socket and updates the metric. +// Must NOT be called while holding e.Lock(). +func (e *EventHandler) emitClockClass(clockClass fbprotocol.ClockClass, cfgName string) { + brokenPipe := utils.EmitClockClass(e.getConn(), PTP4lProcessName, cfgName, clockClass) + if !e.stdoutToSocket && e.clockClassMetric != nil { + e.clockClassMetric.With(prometheus.Labels{ + "process": PTP4lProcessName, "config": cfgName, "node": e.nodeName}).Set(float64(clockClass)) + } + if brokenPipe { + e.signalBrokenPipe() + } +} + +// signalBrokenPipe sends a non-blocking signal on brokenPipeCh to notify +// the ProcessEvents loop that the socket connection needs reconnection. +func (e *EventHandler) signalBrokenPipe() { + select { + case e.brokenPipeCh <- struct{}{}: + default: + } +} + +// reconnectEventSocket closes the current connection and dials a new one using +// the shared reconnection utility with exponential backoff. +func (e *EventHandler) reconnectEventSocket() bool { + e.reconnectMu.Lock() + defer e.reconnectMu.Unlock() + + if e.getConn() != nil { + return true + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-e.closeCh: + cancel() + case <-ctx.Done(): + } + }() + defer cancel() + dialer := net.Dialer{Timeout: socketDialTimeout} + newConn := utils.ReconnectWithBackoff(ctx, + func() (net.Conn, error) { return dialer.DialContext(ctx, "unix", e.stdoutSocket) }, + utils.DefaultReconnectConfig(), + ) + if newConn != nil { + e.setConn(newConn) + return true + } + return false +} + +// writeLogToSocket writes a single log line to the event socket. +// If the write fails, it attempts to reconnect and retry once. +func (e *EventHandler) writeLogToSocket(l string) bool { + conn := e.getConn() + if conn == nil { + return false + } + if err := conn.SetWriteDeadline(time.Now().Add(socketWriteTimeout)); err != nil { + glog.Warningf("Failed to set write deadline: %v", err) + } + if _, err := conn.Write([]byte(l)); err != nil { + glog.Errorf("Write error for %q: %v", l, err) + e.setConn(nil) + if !e.reconnectEventSocket() { + glog.Warning("Reconnect failed after write error, skipping remaining socket writes; will retry on next event") + return false + } + retryConn := e.getConn() + if retryConn == nil { + glog.Warning("Connection is nil after reconnect, skipping retry") + return false + } + if deadlineErr := retryConn.SetWriteDeadline(time.Now().Add(socketWriteTimeout)); deadlineErr != nil { + glog.Warningf("Failed to set write deadline on retry: %v", deadlineErr) + } + if _, retryErr := retryConn.Write([]byte(l)); retryErr != nil { + glog.Errorf("Write failed again after reconnect for %q: %v", l, retryErr) + e.setConn(nil) + return false + } + } + return true +} + // ProcessEvents ... process events to generate new events func (e *EventHandler) ProcessEvents() { - var c net.Conn - var err error redialClockClass := true - retryCount := 0 + defer func() { - if e.stdoutToSocket && c != nil { - if err = c.Close(); err != nil { - glog.Errorf("closing connection returned error %s", err) - } + if e.stdoutToSocket { + e.setConn(nil) // closes the connection if present } }() var lastClockState PTPState -connect: - select { - case <-e.closeCh: - return - default: - if e.stdoutToSocket { - c, err = net.Dial("unix", e.stdoutSocket) - if err != nil { - // reduce log spam - if retryCount == 0 || retryCount%5 == 0 { - glog.Errorf("waiting for event socket, retrying %s", err) - } - retryCount = (retryCount + 1) % 6 - time.Sleep(connectionRetryInterval) - goto connect + + // Establish initial connection to the event socket using exponential backoff. + // Retries indefinitely until connected or the handler is shutting down. + if e.stdoutToSocket { + for !e.reconnectEventSocket() { + // reconnectEventSocket returns false on shutdown or exhausted retries; + // check for shutdown before retrying + select { + case <-e.closeCh: + return + default: + glog.Warning("Initial connection to event socket failed, retrying in 1 second...") + time.Sleep(1 * time.Second) } - retryCount = 0 } } if redialClockClass { - go func(eConn *net.Conn) { + go func() { defer func() { if err := recover(); err != nil { glog.Errorf("restored from clock class update: %s", err) @@ -609,27 +753,55 @@ connect: for { select { case clk := <-clockClassRequestCh: - e.UpdateClockClass(c, clk) cfgName = clk.cfgName + if clk.clockType != BC { + e.UpdateClockClass(clk) + } else { + e.Lock() + e.clockClass = clk.clockClass + e.clockAccuracy = clk.clockAccuracy + e.Unlock() + } + case <-e.closeCh: return - case <-classTicker.C: // send clock class event 60 secs interval + case <-classTicker.C: + e.Lock() + clkSnapshot := make(map[string]fbprotocol.ClockClass, len(e.clkSyncState)) + for k, v := range e.clkSyncState { + clkSnapshot[k] = v.clockClass + } + e.Unlock() + for clkCfgName, clockClass := range clkSnapshot { + parts := strings.SplitN(clkCfgName, ".", 2) + if len(parts) >= 2 { + clkCfgName = "ptp4l." + strings.Join(parts[1:], ".") + } + if clockClass == 0 { + continue + } + if clkCfgName == cfgName { + cfgName = "" + } + if brokenPipe := utils.EmitClockClass(e.getConn(), PTP4lProcessName, clkCfgName, clockClass); brokenPipe { + glog.Warning("Broken pipe detected in clock class ticker, signaling reconnection") + e.signalBrokenPipe() + break + } + } + if cfgName != "" { - clockClassOut := fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", PTP4l, time.Now().Unix(), cfgName, e.clockClass) - if e.stdoutToSocket { - if c != nil { - _, err := c.Write([]byte(clockClassOut)) - if err != nil { - glog.Errorf("failed to write class change event %s", err.Error()) - } - } else { - glog.Errorf("failed to write class change event, connection is nil") - } + e.Lock() + currentClockClass := e.clockClass + e.Unlock() + if brokenPipe := utils.EmitClockClass(e.getConn(), PTP4lProcessName, cfgName, currentClockClass); brokenPipe { + glog.Warning("Broken pipe detected in clock class ticker, signaling reconnection") + e.signalBrokenPipe() } } } } - }(&c) + }() redialClockClass = false } // call all monitoring candidates; verify every 5 secs for any new @@ -669,7 +841,9 @@ connect: } } } + e.Lock() delete(e.clkSyncState, event.CfgName) // delete the clkSyncState + e.Unlock() e.outOfSpec = false e.frequencyTraceable = false } @@ -695,20 +869,29 @@ connect: if event.ClockType == GM { dataDetails = e.addEvent(event) // Computes GM state + e.Lock() clockState = e.updateGMState(event.CfgName) // right now if GPS offset || mode is bad then consider source lost if e.clkSyncState[event.CfgName] != nil { e.clkSyncState[event.CfgName].sourceLost = event.OutOfSpec } + e.Unlock() if clockState.state != PTP_LOCKED { // here update nmea status if _, ok := event.Values[NMEA_STATUS]; ok { event.Values[NMEA_STATUS] = 0 } } } else { // T-BC or T-TSC + e.Lock() event = e.convergeConfig(event) dataDetails = e.addEvent(event) - clockState = e.updateBCState(event, c) + var needsTTSCAnnounce bool + clockState, needsTTSCAnnounce = e.updateBCState(event) + e.Unlock() + // Perform TTSC clock class announcement I/O after releasing the lock + if needsTTSCAnnounce { + e.emitClockClass(clockState.clockClass, event.CfgName) + } } logDataValues = dataDetails.logData if event.WriteToLog && logDataValues != "" { @@ -796,21 +979,32 @@ connect: } // Not SYNC-E if len(logOut) > 0 { + // Always print all logs to stdout regardless of socket state + for _, l := range logOut { + fmt.Printf("%s", l) + } if e.stdoutToSocket { - for _, l := range logOut { - fmt.Printf("%s", l) - _, err = c.Write([]byte(l)) - if err != nil { - glog.Errorf("Write %s error %s:", l, err) - goto connect + if e.getConn() == nil { + glog.Error("No connection available, attempting reconnect") + if !e.reconnectEventSocket() { + glog.Warning("Reconnect failed, skipping socket writes; will retry on next event") } } - } else { for _, l := range logOut { - fmt.Printf("%s", l) + if !e.writeLogToSocket(l) { + break + } } } } + case <-e.brokenPipeCh: + // A background goroutine detected a broken pipe on the event socket. + // Clear the broken connection and reconnect. + glog.Info("Broken pipe signal received, reconnecting event socket") + e.setConn(nil) + if !e.reconnectEventSocket() { + glog.Warning("Reconnect failed after broken pipe signal; will retry on next event") + } case <-e.closeCh: return } @@ -1041,7 +1235,7 @@ func (e *EventHandler) addEvent(event EventChannel) *DataDetails { } // UpdateClockClass ... update clock class -func (e *EventHandler) UpdateClockClass(c net.Conn, clk ClockClassRequest) { +func (e *EventHandler) UpdateClockClass(clk ClockClassRequest) { classErr, clockClass, clockAccuracy := e.updateClockClass(clk.cfgName, clk.clockClass, clk.clockType, clk.clockAccuracy, PMCGMGetter, PMCGMSetter) glog.Infof("received %s,%v,%s,%v", clk.cfgName, clk.clockClass, clk.clockType, clk.clockAccuracy) @@ -1049,14 +1243,19 @@ func (e *EventHandler) UpdateClockClass(c net.Conn, clk ClockClassRequest) { glog.Errorf("error updating clock class %s", classErr) } else { glog.Infof("updated clock class for last clock class %d to %d with clock accuracy %d", clk.clockClass, clockClass, clockAccuracy) + e.Lock() e.clockClass = clockClass e.clockAccuracy = clockAccuracy - clockClassOut := fmt.Sprintf("%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", PTP4l, time.Now().Unix(), clk.cfgName, clockClass) + e.Unlock() + clockClassOut := utils.GetClockClassLogMessage(PTP4lProcessName, clk.cfgName, clockClass) if e.stdoutToSocket { + c := e.getConn() if c != nil { - _, err := c.Write([]byte(clockClassOut)) - if err != nil { + if _, err := c.Write([]byte(clockClassOut)); err != nil { glog.Errorf("failed to write class change event %s", err.Error()) + if utils.IsBrokenPipe(err) { + e.signalBrokenPipe() + } } } else { glog.Errorf("failed to write class change event, connection is nil") @@ -1075,3 +1274,103 @@ func getMetricName(valueType ValueType) string { } return string(valueType) } + +// PortRoleEvent stores port role change information for re-emission. +type PortRoleEvent struct { + Raw string +} + +// SetPortRole saves the port role change event +func (e *EventHandler) SetPortRole(cfgName, portName string, raw string) { + if e.portRole == nil { + e.portRole = make(map[string]map[string]*PortRoleEvent) + } + if _, ok := e.portRole[cfgName]; !ok { + e.portRole[cfgName] = make(map[string]*PortRoleEvent) + } + e.portRole[cfgName][portName] = &PortRoleEvent{Raw: raw} +} + +// EmitClockSyncLogs emits the clock sync state logs +func (e *EventHandler) EmitClockSyncLogs() { + glog.Info("Re-emitting metrics logs for event-proxy as requested") + + if e.getConn() == nil { + glog.Warning("Connection is nil, attempting to reconnect before emitting clock sync logs") + if !e.reconnectEventSocket() { + glog.Error("Failed to emit clock sync logs, reconnect failed") + return + } + } + e.Lock() + logs := make([]string, 0, len(e.clkSyncState)) + for _, syncState := range e.clkSyncState { + if syncState.clkLog != "" { + logs = append(logs, syncState.clkLog) + } + } + e.Unlock() + + for _, l := range logs { + glog.Info(l) + if !e.writeLogToSocket(l) { + glog.Warning("Broken pipe detected while emitting clock sync logs, stopping.") + break + } + } +} + +// EmitPortRoleLogs emits the port role logs +func (e *EventHandler) EmitPortRoleLogs() { + if e.getConn() == nil { + glog.Warning("Connection is nil, attempting to reconnect before emitting port role logs") + if !e.reconnectEventSocket() { + glog.Error("Failed to emit port state logs, reconnect failed") + return + } + } + glog.Info("Re-emitting metrics logs for event-proxy as requested") + + e.Lock() + type portRoleEntry struct { + raw string + } + var entries []portRoleEntry + for _, ports := range e.portRole { + for _, portEvent := range ports { + if portEvent != nil { + entries = append(entries, portRoleEntry{raw: portEvent.Raw}) + } + } + } + e.Unlock() + + for _, entry := range entries { + glog.Infof("Port Event %s", entry.raw) + if !e.writeLogToSocket(entry.raw) { + glog.Warning("Broken pipe detected while emitting port role logs, stopping.") + break + } + } +} + +// EmitClockClass emits the current clock class and accuracy for the specified configuration. +func (e *EventHandler) EmitClockClass(cfgName string) { + e.Lock() + state, ok := e.clkSyncState[cfgName] + if !ok { + e.Unlock() + return + } + clockClass := state.clockClass + clockAccuracy := state.clockAccuracy + e.Unlock() + e.announceClockClass(clockClass, clockAccuracy, cfgName) +} + +// EmitProcessStatusLog writes a process status log entry to the event socket +func (e *EventHandler) EmitProcessStatusLog(processName, cfgName string, status int64) { + message := fmt.Sprintf("%s[%d]:[%s] PTP_PROCESS_STATUS:%d", processName, time.Now().Unix(), cfgName, status) + glog.Info(message) + e.writeLogToSocket(message + "\n") +} diff --git a/pkg/event/event_socket_test.go b/pkg/event/event_socket_test.go new file mode 100644 index 00000000..1ac48832 --- /dev/null +++ b/pkg/event/event_socket_test.go @@ -0,0 +1,465 @@ +package event + +import ( + "net" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// newTestEventHandler creates a minimal EventHandler for testing socket logic. +func newTestEventHandler(socketPath string) *EventHandler { + return &EventHandler{ + stdoutSocket: socketPath, + stdoutToSocket: true, + closeCh: make(chan bool, 1), + brokenPipeCh: make(chan struct{}, 1), + clkSyncState: map[string]*clockSyncState{}, + } +} + +// shortSocketPath returns a short socket path that fits within Unix socket +// path length limits (104 bytes on macOS, 108 on Linux). +func shortSocketPath(t *testing.T) string { + t.Helper() + dir, err := os.MkdirTemp("", "sock") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + t.Cleanup(func() { os.RemoveAll(dir) }) + return filepath.Join(dir, "t.sock") +} + +// acceptAndClose accepts connections in a loop and closes them immediately. +// Useful for tests that only need to verify a connection can be established. +func acceptAndClose(listener net.Listener) { + for { + c, acceptErr := listener.Accept() + if acceptErr != nil { + return + } + c.Close() + } +} + +// acceptAndHold accepts connections in a loop and keeps them open (discards ref). +func acceptAndHold(listener net.Listener) { + for { + _, acceptErr := listener.Accept() + if acceptErr != nil { + return + } + } +} + +// acceptAndRead accepts connections and forwards received data to the channel. +func acceptAndRead(listener net.Listener, received chan<- string) { + for { + c, acceptErr := listener.Accept() + if acceptErr != nil { + return + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 1024) + for { + n, readErr := c.Read(buf) + if readErr != nil { + return + } + received <- string(buf[:n]) + } + }(c) + } +} + +// acceptAndReadOnce accepts one connection and reads a single message. +func acceptAndReadOnce(listener net.Listener, received chan<- string) { + for { + c, acceptErr := listener.Accept() + if acceptErr != nil { + return + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 1024) + n, _ := c.Read(buf) + received <- string(buf[:n]) + }(c) + } +} + +// --- setConn / getConn --- + +func TestGetConn_ReturnsNilWhenUnset(t *testing.T) { + e := newTestEventHandler("") + assert.Nil(t, e.getConn()) +} + +func TestSetConn_StoresConnection(t *testing.T) { + e := newTestEventHandler("") + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + e.setConn(client) + assert.Equal(t, client, e.getConn()) +} + +func TestSetConn_ClosesOldConnection(t *testing.T) { + e := newTestEventHandler("") + + _, oldClient := net.Pipe() + _, newClient := net.Pipe() + defer newClient.Close() + + e.setConn(oldClient) + e.setConn(newClient) + + // The old connection should be closed; writing to it should fail. + _, err := oldClient.Write([]byte("test")) + assert.Error(t, err, "old connection should be closed after setConn replaces it") + assert.Equal(t, newClient, e.getConn()) +} + +func TestSetConn_NilClearsAndClosesOldConnection(t *testing.T) { + e := newTestEventHandler("") + + _, client := net.Pipe() + e.setConn(client) + e.setConn(nil) + + assert.Nil(t, e.getConn()) + _, err := client.Write([]byte("test")) + assert.Error(t, err, "connection should be closed after setConn(nil)") +} + +func TestSetConn_SameConnectionNoClose(t *testing.T) { + e := newTestEventHandler("") + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + e.setConn(client) + e.setConn(client) // same connection, should not close it + + // Connection should still be usable. + go func() { + buf := make([]byte, 4) + _, _ = server.Read(buf) + }() + _, err := client.Write([]byte("test")) + assert.NoError(t, err, "connection should still be usable when setConn is called with the same connection") +} + +func TestGetSetConn_ConcurrentAccess(t *testing.T) { + e := newTestEventHandler("") + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(2) + go func() { + defer wg.Done() + e.setConn(client) + }() + go func() { + defer wg.Done() + _ = e.getConn() + }() + } + wg.Wait() + // No race detector failures = success; use assert to satisfy unused-parameter lint. + assert.NotNil(t, e) +} + +// --- signalBrokenPipe --- + +func TestSignalBrokenPipe_SendsSignal(t *testing.T) { + e := newTestEventHandler("") + e.signalBrokenPipe() + + select { + case <-e.brokenPipeCh: + // expected + default: + t.Fatal("expected signal on brokenPipeCh") + } +} + +func TestSignalBrokenPipe_NonBlocking(t *testing.T) { + e := newTestEventHandler("") + + // Fill the channel. + e.brokenPipeCh <- struct{}{} + + // Second signal should not block. + done := make(chan struct{}) + go func() { + e.signalBrokenPipe() + close(done) + }() + + select { + case <-done: + // expected: did not block + case <-time.After(1 * time.Second): + t.Fatal("signalBrokenPipe blocked when channel was already full") + } +} + +// --- writeLogToSocket --- + +func TestWriteLogToSocket_NilConnection(t *testing.T) { + e := newTestEventHandler("") + // conn is nil by default + result := e.writeLogToSocket("test message\n") + assert.False(t, result, "should return false when connection is nil") +} + +func TestWriteLogToSocket_SuccessfulWrite(t *testing.T) { + e := newTestEventHandler("") + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + e.setConn(client) + + msg := "test message\n" + go func() { + buf := make([]byte, len(msg)) + _, _ = server.Read(buf) + }() + + result := e.writeLogToSocket(msg) + assert.True(t, result, "should return true on successful write") + assert.NotNil(t, e.getConn(), "connection should still be set after successful write") +} + +func TestWriteLogToSocket_BrokenPipeClearsConn(t *testing.T) { + // Create a socket pair and close the server side to simulate broken pipe. + server, client := net.Pipe() + server.Close() // close server so writes fail + + // Use an invalid socket path so reconnect fails. + // Send shutdown signal shortly after to abort reconnect backoff quickly. + e := newTestEventHandler("/nonexistent/test.sock") + e.setConn(client) + + go func() { + time.Sleep(50 * time.Millisecond) + e.closeCh <- true + }() + + result := e.writeLogToSocket("test message\n") + assert.False(t, result, "should return false when write fails and reconnect is not possible") + assert.Nil(t, e.getConn(), "connection should be nil after failed write") +} + +// --- reconnectEventSocket --- + +func TestReconnectEventSocket_SuccessWithUnixSocket(t *testing.T) { + // Create a temporary Unix domain socket with short path. + socketPath := shortSocketPath(t) + listener, err := net.Listen("unix", socketPath) + assert.NoError(t, err) + defer listener.Close() + + go acceptAndClose(listener) + + e := newTestEventHandler(socketPath) + result := e.reconnectEventSocket() + assert.True(t, result, "should succeed when socket is available") + assert.NotNil(t, e.getConn(), "connection should be set after successful reconnect") + e.setConn(nil) // cleanup +} + +func TestReconnectEventSocket_FailsWithNoSocket(t *testing.T) { + e := newTestEventHandler("/nonexistent/path/test.sock") + + // Send shutdown signal shortly after to abort reconnect backoff quickly. + go func() { + time.Sleep(50 * time.Millisecond) + e.closeCh <- true + }() + + result := e.reconnectEventSocket() + assert.False(t, result, "should fail when socket path does not exist") + assert.Nil(t, e.getConn(), "connection should remain nil") +} + +func TestReconnectEventSocket_ReturnsExistingConnection(t *testing.T) { + e := newTestEventHandler("") + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + e.setConn(client) + result := e.reconnectEventSocket() + assert.True(t, result, "should return true when connection already exists") + assert.Equal(t, client, e.getConn(), "existing connection should be unchanged") +} + +func TestReconnectEventSocket_RespondsToShutdown(t *testing.T) { + e := newTestEventHandler("/nonexistent/path/test.sock") + + // Signal shutdown before calling reconnect. + go func() { + time.Sleep(10 * time.Millisecond) + e.closeCh <- true + }() + + start := time.Now() + result := e.reconnectEventSocket() + elapsed := time.Since(start) + + assert.False(t, result, "should return false when shutdown signal received") + assert.Less(t, elapsed, 3*time.Second, "should return quickly on shutdown signal") +} + +func TestReconnectEventSocket_Serialized(t *testing.T) { + socketPath := shortSocketPath(t) + listener, err := net.Listen("unix", socketPath) + assert.NoError(t, err) + defer listener.Close() + + go acceptAndHold(listener) + + e := newTestEventHandler(socketPath) + var wg sync.WaitGroup + results := make([]bool, 5) + + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx] = e.reconnectEventSocket() + }(i) + } + wg.Wait() + + // All should succeed (via reconnect or early return since conn is already set). + for i, r := range results { + assert.True(t, r, "goroutine %d should succeed", i) + } + assert.NotNil(t, e.getConn()) + e.setConn(nil) // cleanup +} + +// --- writeLogToSocket with real socket --- + +func TestWriteLogToSocket_WriteAndReconnect(t *testing.T) { + socketPath := shortSocketPath(t) + listener, err := net.Listen("unix", socketPath) + assert.NoError(t, err) + defer listener.Close() + + received := make(chan string, 10) + go acceptAndRead(listener, received) + + e := newTestEventHandler(socketPath) + + // Establish initial connection. + assert.True(t, e.reconnectEventSocket()) + + // Write successfully. + msg := "hello world\n" + result := e.writeLogToSocket(msg) + assert.True(t, result) + + // Verify the message was received. + select { + case got := <-received: + assert.Equal(t, msg, got) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for message") + } + + // Simulate broken pipe: close the current connection. + e.setConn(nil) + + // writeLogToSocket should detect nil conn and return false. + result = e.writeLogToSocket("should fail\n") + assert.False(t, result) + + e.setConn(nil) // cleanup +} + +// --- EmitProcessStatusLog --- + +func TestEmitProcessStatusLog_WritesToSocket(t *testing.T) { + socketPath := shortSocketPath(t) + listener, err := net.Listen("unix", socketPath) + assert.NoError(t, err) + defer listener.Close() + + received := make(chan string, 10) + go acceptAndRead(listener, received) + + e := newTestEventHandler(socketPath) + assert.True(t, e.reconnectEventSocket()) + + e.EmitProcessStatusLog("ptp4l", "ptp4l.0.config", 1) + + select { + case got := <-received: + assert.True(t, strings.Contains(got, "ptp4l"), "message should contain process name") + assert.True(t, strings.Contains(got, "ptp4l.0.config"), "message should contain config name") + assert.True(t, strings.Contains(got, "PTP_PROCESS_STATUS:1"), "message should contain status") + assert.True(t, strings.HasSuffix(got, "\n"), "message should end with newline") + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for process status message") + } + + e.setConn(nil) // cleanup +} + +func TestEmitProcessStatusLog_FormatIsCorrect(t *testing.T) { + e := newTestEventHandler("") + // No connection; should not panic, just log and return. + e.EmitProcessStatusLog("ptp4l", "test.config", 0) + + // Verify format by calling with a connection that captures output. + socketPath := shortSocketPath(t) + listener, err := net.Listen("unix", socketPath) + assert.NoError(t, err) + defer listener.Close() + + received := make(chan string, 10) + go acceptAndReadOnce(listener, received) + + e2 := newTestEventHandler(socketPath) + assert.True(t, e2.reconnectEventSocket()) + + now := time.Now().Unix() + e2.EmitProcessStatusLog("phc2sys", "phc2sys.0.config", 0) + + select { + case got := <-received: + // Verify prefix and suffix without relying on exact timestamp match + assert.True(t, strings.HasPrefix(got, "phc2sys["), "message should start with process name") + assert.True(t, strings.HasSuffix(got, "]:[phc2sys.0.config] PTP_PROCESS_STATUS:0\n"), "message should end with config and status") + + // Parse and verify the embedded timestamp is within a reasonable range + re := regexp.MustCompile(`\[(\d+)\]`) + matches := re.FindStringSubmatch(got) + if assert.Len(t, matches, 2, "should contain a bracketed timestamp") { + timestamp, parseErr := strconv.ParseInt(matches[1], 10, 64) + assert.NoError(t, parseErr) + assert.InDelta(t, now, timestamp, 2, "timestamp should be within 2 seconds of test time") + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for message") + } + + e2.setConn(nil) // cleanup +} diff --git a/pkg/event/event_tbc.go b/pkg/event/event_tbc.go index 44bd7df9..7ac2f496 100644 --- a/pkg/event/event_tbc.go +++ b/pkg/event/event_tbc.go @@ -3,14 +3,12 @@ package event import ( "fmt" "math" - "net" "strings" "time" "github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/pmc" "github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/protocol" "github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/utils" - "github.com/prometheus/client_golang/prometheus" fbprotocol "github.com/facebook/time/ptp/protocol" "github.com/golang/glog" @@ -67,7 +65,10 @@ type LeadingClockParams struct { clockID string } -func (e *EventHandler) updateBCState(event EventChannel, c net.Conn) clockSyncState { +// updateBCState updates the BC/TSC state machine. +// Called with e.Lock() held. Returns the clock sync state and whether a TTSC clock class +// announcement is needed (the caller must perform the I/O after releasing the lock). +func (e *EventHandler) updateBCState(event EventChannel) (clockSyncState, bool) { cfgName := event.CfgName dpllState := PTP_NOTSET ts2phcState := PTP_FREERUN @@ -80,7 +81,7 @@ func (e *EventHandler) updateBCState(event EventChannel, c net.Conn) clockSyncSt leadingInterface := e.getLeadingInterfaceBC() if leadingInterface == LEADING_INTERFACE_UNKNOWN { glog.Infof("Leading interface is not yet identified, clock state reporting delayed.") - return clockSyncState{leadingIFace: leadingInterface} + return clockSyncState{leadingIFace: leadingInterface}, false } if _, ok := e.clkSyncState[cfgName]; !ok { @@ -138,7 +139,7 @@ func (e *EventHandler) updateBCState(event EventChannel, c net.Conn) clockSyncSt e.clkSyncState[cfgName].clkLog = fmt.Sprintf("T-BC[%d]:[%s] %s offset %d T-BC-STATUS %s\n", e.clkSyncState[cfgName].lastLoggedTime, cfgName, leadingInterface, e.clkSyncState[cfgName].clockOffset, e.clkSyncState[cfgName].state) - return *e.clkSyncState[cfgName] + return *e.clkSyncState[cfgName], false } glog.Info("current BC state: ", e.clkSyncState[cfgName].state) switch e.clkSyncState[cfgName].state { @@ -226,12 +227,9 @@ func (e *EventHandler) updateBCState(event EventChannel, c net.Conn) clockSyncSt e.clkSyncState[cfgName].clockOffset = e.getLargestOffset(cfgName) } + needsTTSCAnnounce := false if updateDownstreamData { - if gSycState.state == PTP_LOCKED { - go e.downstreamAnnounceIWF(cfgName, c) - } else { - go e.announceLocalData(cfgName, c) - } + go e.updateDownstreamData(cfgName) } // this will reduce log noise and prints 1 per sec logTime := time.Now().Unix() @@ -244,42 +242,57 @@ func (e *EventHandler) updateBCState(event EventChannel, c net.Conn) clockSyncSt glog.Infof("dpll State %s, tsphc state %s, BC state %s, BC offset %d", dpllState, ts2phcState, e.clkSyncState[cfgName].state, e.clkSyncState[cfgName].clockOffset) } - return rclockSyncState + return rclockSyncState, needsTTSCAnnounce } -func (e *EventHandler) announceClockClass(clockClass int, cfgName string, c net.Conn) { - message := fmt.Sprintf("ptp4l[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", time.Now().Unix(), - cfgName, clockClass) - if e.stdoutToSocket { - if c != nil { - _, err := c.Write([]byte(message)) - if err != nil { - glog.Errorf("failed to write class change event %s", err.Error()) - } - } else { - glog.Errorf("failed to write class change event, connection is nil") - } - } else if e.clockClassMetric != nil { - e.clockClassMetric.With(prometheus.Labels{ - "process": PTP4lProcessName, "config": cfgName, "node": e.nodeName}).Set(float64(clockClass)) +func (e *EventHandler) updateDownstreamData(cfgName string) { + e.Lock() + data, ok := e.clkSyncState[cfgName] + if !ok { + e.Unlock() + return + } + state := data.state + e.Unlock() + if state == PTP_LOCKED { + go e.downstreamAnnounceIWF(cfgName) + } else { + go e.announceLocalData(cfgName) } - glog.Infof("%s", message) } // Implements Rec. ITU-T G.8275 (2024) Amd. 1 (08/2024) // Table VIII.3 − T-BC-/ T-BC-P/ T-BC-A Announce message contents // for free-run (acquiring), holdover within / out of the specification -func (e *EventHandler) announceLocalData(cfgName string, c net.Conn) { +func (e *EventHandler) announceLocalData(cfgName string) { + // Snapshot shared data under lock to prevent data races with updateBCState + e.Lock() + clockID := e.LeadingClockData.clockID + controlledPortsConfig := e.LeadingClockData.controlledPortsConfig + downstreamTimeProperties := e.LeadingClockData.downstreamTimeProperties + state, ok := e.clkSyncState[cfgName] + if !ok { + e.Unlock() + return + } + clockClass := state.clockClass + clockAccuracy := state.clockAccuracy + e.Unlock() + egp := protocol.ExternalGrandmasterProperties{ - GrandmasterIdentity: e.LeadingClockData.clockID, + GrandmasterIdentity: clockID, StepsRemoved: 0, } glog.Infof("EGP %++v", egp) - go pmc.RunPMCExpSetExternalGMPropertiesNP(e.LeadingClockData.controlledPortsConfig, egp) - e.announceClockClass(int(e.clkSyncState[cfgName].clockClass), cfgName, c) + go func() { + if err := pmc.RunPMCExpSetExternalGMPropertiesNP(controlledPortsConfig, egp); err != nil { + glog.Errorf("Failed to set external GM properties: %v", err) + } + }() + e.announceClockClass(clockClass, clockAccuracy, cfgName) gs := protocol.GrandmasterSettings{ ClockQuality: fbprotocol.ClockQuality{ - ClockClass: e.clkSyncState[cfgName].clockClass, + ClockClass: clockClass, ClockAccuracy: fbprotocol.ClockAccuracyUnknown, OffsetScaledLogVariance: 0xffff, }, @@ -287,7 +300,7 @@ func (e *EventHandler) announceLocalData(cfgName string, c net.Conn) { TimeSource: fbprotocol.TimeSourceInternalOscillator, }, } - switch e.clkSyncState[cfgName].clockClass { + switch clockClass { case protocol.ClockClassFreerun: gs.TimePropertiesDS.CurrentUtcOffsetValid = false gs.TimePropertiesDS.Leap59 = false @@ -298,39 +311,52 @@ func (e *EventHandler) announceLocalData(cfgName string, c net.Conn) { gs.TimePropertiesDS.FrequencyTraceable = false gs.TimePropertiesDS.CurrentUtcOffset = int32(leap.GetUtcOffset()) case fbprotocol.ClockClass(165), fbprotocol.ClockClass(135): - if e.LeadingClockData.upstreamTimeProperties == nil { + if downstreamTimeProperties == nil { glog.Info("Pending upstream clock data acquisition, skip updates") return } - gs.TimePropertiesDS.CurrentUtcOffsetValid = e.LeadingClockData.upstreamTimeProperties.CurrentUtcOffsetValid - gs.TimePropertiesDS.Leap59 = e.LeadingClockData.upstreamTimeProperties.Leap59 - gs.TimePropertiesDS.Leap61 = e.LeadingClockData.upstreamTimeProperties.Leap61 + gs.TimePropertiesDS.CurrentUtcOffsetValid = downstreamTimeProperties.CurrentUtcOffsetValid + gs.TimePropertiesDS.Leap59 = downstreamTimeProperties.Leap59 + gs.TimePropertiesDS.Leap61 = downstreamTimeProperties.Leap61 gs.TimePropertiesDS.PtpTimescale = true - if e.clkSyncState[cfgName].clockClass == fbprotocol.ClockClass(135) { + if clockClass == fbprotocol.ClockClass(135) { gs.TimePropertiesDS.TimeTraceable = true } else { gs.TimePropertiesDS.TimeTraceable = false } // TODO: get the real freq traceability status when implemented gs.TimePropertiesDS.FrequencyTraceable = false - gs.TimePropertiesDS.CurrentUtcOffset = e.LeadingClockData.upstreamTimeProperties.CurrentUtcOffset + gs.TimePropertiesDS.CurrentUtcOffset = downstreamTimeProperties.CurrentUtcOffset default: } - go pmc.RunPMCExpSetGMSettings(e.LeadingClockData.controlledPortsConfig, gs) + go func() { + if err := pmc.RunPMCExpSetGMSettings(controlledPortsConfig, gs); err != nil { + glog.Errorf("Failed to set GM settings: %v", err) + } + }() } // this function runs in a goroutine -func (e *EventHandler) downstreamAnnounceIWF(cfgName string, c net.Conn) { +func (e *EventHandler) downstreamAnnounceIWF(cfgName string) { ptpCfgName := strings.Replace(cfgName, "ts2phc", "ptp4l", 1) glog.Infof("downstreamAnnounceIWF: %s", ptpCfgName) + + e.Lock() + controlledPortsConfig := e.LeadingClockData.controlledPortsConfig + e.Unlock() + results, err := pmc.RunPMCExpGetMultiple(ptpCfgName) if err != nil { - glog.Error(err) + glog.Error("Failed to fetch upstream data, downstream data can not be updated: ", err) + return } + + e.Lock() e.LeadingClockData.upstreamTimeProperties = &results.TimePropertiesDS e.LeadingClockData.upstreamParentDataSet = &results.ParentDataSet e.LeadingClockData.upstreamCurrentDSStepsRemoved = results.CurrentDS.StepsRemoved + e.Unlock() gs := protocol.GrandmasterSettings{ ClockQuality: fbprotocol.ClockQuality{ @@ -342,18 +368,22 @@ func (e *EventHandler) downstreamAnnounceIWF(cfgName string, c net.Conn) { } es := protocol.ExternalGrandmasterProperties{ GrandmasterIdentity: results.ParentDataSet.GrandmasterIdentity, - // stepsRemoved at this point is already incremented, representing the current clock position - StepsRemoved: results.CurrentDS.StepsRemoved, + StepsRemoved: results.CurrentDS.StepsRemoved, } glog.Infof("%++v", es) - e.announceClockClass(int(gs.ClockQuality.ClockClass), cfgName, c) - if err = pmc.RunPMCExpSetExternalGMPropertiesNP(e.LeadingClockData.controlledPortsConfig, es); err != nil { + e.announceClockClass(gs.ClockQuality.ClockClass, gs.ClockQuality.ClockAccuracy, cfgName) + if err := pmc.RunPMCExpSetExternalGMPropertiesNP(controlledPortsConfig, es); err != nil { glog.Error(err) } - if err = pmc.RunPMCExpSetGMSettings(e.LeadingClockData.controlledPortsConfig, gs); err != nil { + if err := pmc.RunPMCExpSetGMSettings(controlledPortsConfig, gs); err != nil { glog.Error(err) } glog.Infof("%++v", es) + + e.Lock() + e.LeadingClockData.downstreamParentDataSet = &results.ParentDataSet + e.LeadingClockData.downstreamTimeProperties = &results.TimePropertiesDS + e.Unlock() } func (e *EventHandler) inSyncCondition(cfgName string) bool { diff --git a/pkg/utils/clock_class_log.go b/pkg/utils/clock_class_log.go new file mode 100644 index 00000000..e4fed9a6 --- /dev/null +++ b/pkg/utils/clock_class_log.go @@ -0,0 +1,66 @@ +package utils + +import ( + "errors" + "fmt" + "net" + "syscall" + "time" + + fbprotocol "github.com/facebook/time/ptp/protocol" + "github.com/golang/glog" +) + +// GetClockClassLogMessage formats a clock class change message with timestamp. +func GetClockClassLogMessage(name, configName string, clockClass fbprotocol.ClockClass) string { + return fmt.Sprintf( + "%s[%d]:[%s] CLOCK_CLASS_CHANGE %d\n", + name, time.Now().Unix(), configName, clockClass, + ) +} + +// IsBrokenPipe checks if the error indicates a broken pipe or connection issue +func IsBrokenPipe(err error) bool { + if err == nil { + return false + } + + // Check for common connection errors + if errors.Is(err, syscall.EPIPE) || // Broken pipe + errors.Is(err, syscall.ECONNRESET) || // Connection reset by peer + errors.Is(err, syscall.ECONNREFUSED) || // Connection refused + errors.Is(err, syscall.ENOTCONN) { // Not connected + return true + } + + // Check for net.OpError wrapping these errors + var opErr *net.OpError + if errors.As(err, &opErr) { + return IsBrokenPipe(opErr.Err) + } + + return false +} + +// socketWriteTimeout is the maximum time to wait for a write to the event socket. +const socketWriteTimeout = 5 * time.Second + +// EmitClockClass writes a clock class change event to log and the network connection if provided. +// Returns true if a broken pipe error occurred (caller should reconnect and retry). +func EmitClockClass(c net.Conn, name string, configName string, clockClass fbprotocol.ClockClass) bool { + logMsg := GetClockClassLogMessage(name, configName, clockClass) + glog.Info(logMsg) + if c == nil { + return false + } + + if err := c.SetWriteDeadline(time.Now().Add(socketWriteTimeout)); err != nil { + glog.Warningf("Failed to set write deadline for clock class event: %v", err) + } + _, err := c.Write([]byte(logMsg)) + if err != nil { + glog.Errorf("failed to write class change event to socket: %s", err.Error()) + return IsBrokenPipe(err) + } + return false +} diff --git a/pkg/utils/reconnect.go b/pkg/utils/reconnect.go new file mode 100644 index 00000000..5c0ea81a --- /dev/null +++ b/pkg/utils/reconnect.go @@ -0,0 +1,82 @@ +// Package utils provides utility functions for the linuxptp daemon. +// +//nolint:revive // utils is a common and acceptable package name +package utils + +import ( + "context" + "net" + "time" + + "github.com/golang/glog" +) + +const ( + // DefaultMaxReconnectAttempts is the default maximum number of reconnection attempts. + DefaultMaxReconnectAttempts = 10 + // DefaultReconnectBackoffBase is the default initial backoff duration between reconnection attempts. + DefaultReconnectBackoffBase = 100 * time.Millisecond + // DefaultMaxReconnectBackoff is the default maximum backoff duration between reconnection attempts. + DefaultMaxReconnectBackoff = 5 * time.Second +) + +// ReconnectConfig holds the parameters for socket reconnection with exponential backoff. +type ReconnectConfig struct { + // MaxAttempts is the maximum number of reconnection attempts before giving up. + MaxAttempts int + // BackoffBase is the initial backoff duration between reconnection attempts. + BackoffBase time.Duration + // MaxBackoff is the maximum backoff duration (cap for exponential growth). + MaxBackoff time.Duration +} + +// DefaultReconnectConfig returns a ReconnectConfig with sensible defaults. +func DefaultReconnectConfig() ReconnectConfig { + return ReconnectConfig{ + MaxAttempts: DefaultMaxReconnectAttempts, + BackoffBase: DefaultReconnectBackoffBase, + MaxBackoff: DefaultMaxReconnectBackoff, + } +} + +// ReconnectWithBackoff attempts to establish a connection using the provided dial function. +// It retries with exponential backoff up to cfg.MaxAttempts times, and is responsive to +// cancellation via the provided context. +// Returns the new connection, or nil if all attempts are exhausted or the context is cancelled. +func ReconnectWithBackoff(ctx context.Context, dialFn func() (net.Conn, error), cfg ReconnectConfig) net.Conn { + glog.Info("Attempting to reconnect to event socket") + + backoff := cfg.BackoffBase + for attempt := 1; attempt <= cfg.MaxAttempts; attempt++ { + select { + case <-ctx.Done(): + glog.Info("Stop signal received, aborting reconnect attempt") + return nil + default: + } + + newConn, err := dialFn() + if err == nil { + glog.Infof("Successfully reconnected to event socket after %d attempt(s)", attempt) + return newConn + } + + if attempt < cfg.MaxAttempts { + glog.Warningf("Failed to reconnect to event socket (attempt %d/%d): %v, retrying in %v", + attempt, cfg.MaxAttempts, err, backoff) + select { + case <-time.After(backoff): + case <-ctx.Done(): + glog.Info("Stop signal received during backoff, aborting reconnect") + return nil + } + backoff *= 2 + if backoff > cfg.MaxBackoff { + backoff = cfg.MaxBackoff + } + } + } + + glog.Errorf("Failed to reconnect to event socket after %d attempts", cfg.MaxAttempts) + return nil +} diff --git a/pkg/utils/reconnect_test.go b/pkg/utils/reconnect_test.go new file mode 100644 index 00000000..e88a428d --- /dev/null +++ b/pkg/utils/reconnect_test.go @@ -0,0 +1,168 @@ +package utils_test + +import ( + "context" + "errors" + "net" + "syscall" + "testing" + "time" + + "github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/utils" + "github.com/stretchr/testify/assert" +) + +// --- DefaultReconnectConfig --- + +func TestDefaultReconnectConfig(t *testing.T) { + cfg := utils.DefaultReconnectConfig() + assert.Equal(t, utils.DefaultMaxReconnectAttempts, cfg.MaxAttempts) + assert.Equal(t, utils.DefaultReconnectBackoffBase, cfg.BackoffBase) + assert.Equal(t, utils.DefaultMaxReconnectBackoff, cfg.MaxBackoff) +} + +// --- IsBrokenPipe --- + +func TestIsBrokenPipe(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil error", nil, false}, + {"generic error", errors.New("some error"), false}, + {"EPIPE", syscall.EPIPE, true}, + {"ECONNRESET", syscall.ECONNRESET, true}, + {"ECONNREFUSED", syscall.ECONNREFUSED, true}, + {"ENOTCONN", syscall.ENOTCONN, true}, + {"wrapped EPIPE in OpError", &net.OpError{Op: "write", Err: syscall.EPIPE}, true}, + {"wrapped generic in OpError", &net.OpError{Op: "write", Err: errors.New("other")}, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, utils.IsBrokenPipe(tt.err)) + }) + } +} + +// --- ReconnectWithBackoff --- + +func TestReconnectWithBackoff_SuccessOnFirstAttempt(t *testing.T) { + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + dialFn := func() (net.Conn, error) { + return client, nil + } + cfg := utils.ReconnectConfig{ + MaxAttempts: 3, + BackoffBase: 1 * time.Millisecond, + MaxBackoff: 10 * time.Millisecond, + } + conn := utils.ReconnectWithBackoff(context.Background(), dialFn, cfg) + assert.NotNil(t, conn) +} + +func TestReconnectWithBackoff_SuccessAfterRetries(t *testing.T) { + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + attempts := 0 + dialFn := func() (net.Conn, error) { + attempts++ + if attempts < 3 { + return nil, errors.New("connection refused") + } + return client, nil + } + cfg := utils.ReconnectConfig{ + MaxAttempts: 5, + BackoffBase: 1 * time.Millisecond, + MaxBackoff: 5 * time.Millisecond, + } + conn := utils.ReconnectWithBackoff(context.Background(), dialFn, cfg) + assert.NotNil(t, conn) + assert.Equal(t, 3, attempts) +} + +func TestReconnectWithBackoff_ExhaustedRetries(t *testing.T) { + dialFn := func() (net.Conn, error) { + return nil, errors.New("connection refused") + } + cfg := utils.ReconnectConfig{ + MaxAttempts: 3, + BackoffBase: 1 * time.Millisecond, + MaxBackoff: 5 * time.Millisecond, + } + conn := utils.ReconnectWithBackoff(context.Background(), dialFn, cfg) + assert.Nil(t, conn) +} + +func TestReconnectWithBackoff_CancelledBeforeStart(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + called := false + dialFn := func() (net.Conn, error) { + called = true + return nil, errors.New("should not be called") + } + cfg := utils.ReconnectConfig{ + MaxAttempts: 5, + BackoffBase: 1 * time.Millisecond, + MaxBackoff: 5 * time.Millisecond, + } + conn := utils.ReconnectWithBackoff(ctx, dialFn, cfg) + assert.Nil(t, conn) + assert.False(t, called, "dialFn should not have been called after context was cancelled") +} + +func TestReconnectWithBackoff_CancelledDuringBackoff(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + attempts := 0 + dialFn := func() (net.Conn, error) { + attempts++ + if attempts == 1 { + // Cancel context after first failed attempt; will be detected during backoff + go func() { + time.Sleep(1 * time.Millisecond) + cancel() + }() + } + return nil, errors.New("connection refused") + } + cfg := utils.ReconnectConfig{ + MaxAttempts: 10, + BackoffBase: 100 * time.Millisecond, // long enough for cancel to fire + MaxBackoff: 1 * time.Second, + } + start := time.Now() + conn := utils.ReconnectWithBackoff(ctx, dialFn, cfg) + elapsed := time.Since(start) + assert.Nil(t, conn) + assert.Equal(t, 1, attempts, "should have stopped after first attempt due to cancellation") + assert.Less(t, elapsed, 500*time.Millisecond, "should have returned quickly after cancellation") +} + +func TestReconnectWithBackoff_BackoffCapsAtMaxBackoff(t *testing.T) { + attempts := 0 + dialFn := func() (net.Conn, error) { + attempts++ + return nil, errors.New("connection refused") + } + cfg := utils.ReconnectConfig{ + MaxAttempts: 5, + BackoffBase: 1 * time.Millisecond, + MaxBackoff: 2 * time.Millisecond, // base=1ms, doubles to 2ms then caps + } + start := time.Now() + conn := utils.ReconnectWithBackoff(context.Background(), dialFn, cfg) + elapsed := time.Since(start) + assert.Nil(t, conn) + assert.Equal(t, 5, attempts) + // With cap at 2ms and 4 sleeps: 1 + 2 + 2 + 2 = 7ms total sleep (plus overhead) + assert.Less(t, elapsed, 200*time.Millisecond, "backoff should be capped and fast") +}