Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/cloudwego/kitex

go 1.20
go 1.21

require (
github.com/bytedance/gopkg v0.1.3
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 CloudWeGo Authors
* Copyright 2026 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,41 +14,22 @@
* limitations under the License.
*/

package grpc
package stream

import (
"context"
"sync/atomic"
)
import "context"

// contextWithCancelReason implements context.Context
// with a cancel func for passing cancel reason
// NOTE: use context.WithCancelCause when go1.20?
// with Err() retrieving cause err with context.Cause automatically.
// Whether using gRPC or ttstream, the ctx.Err() returns protocol-specific errors rather than context.Canceled or context.DeadlineExceeded.
// When using context.WithCancelCause, an additional layer of encapsulation is still required to avoid breaking changes.
type contextWithCancelReason struct {
context.Context

cancel context.CancelFunc
reason atomic.Value
}

func (c *contextWithCancelReason) Err() error {
err := c.reason.Load()
if err != nil {
return err.(error)
}
return c.Context.Err()
}

func (c *contextWithCancelReason) CancelWithReason(reason error) {
if reason != nil {
c.reason.CompareAndSwap(nil, reason)
}
c.cancel()
return context.Cause(c.Context)
}

type cancelWithReason func(reason error)

func newContextWithCancelReason(ctx context.Context, cancel context.CancelFunc) (context.Context, cancelWithReason) {
ret := &contextWithCancelReason{Context: ctx, cancel: cancel}
return ret, ret.CancelWithReason
func NewContextWithCancelReason(ctx context.Context) context.Context {
return &contextWithCancelReason{Context: ctx}
}
56 changes: 56 additions & 0 deletions internal/stream/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2026 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stream

import (
"context"
"errors"
"testing"
"time"

"github.com/cloudwego/kitex/internal/test"
)

func Test_contextWithCancelReason(t *testing.T) {
ctx := context.Background()
newCtx := NewContextWithCancelReason(ctx)
test.Assert(t, newCtx.Err() == nil, newCtx.Err())

ctx, cancel := context.WithCancel(context.Background())
newCtx = NewContextWithCancelReason(ctx)
cancel()
test.Assert(t, newCtx.Err() == context.Canceled, newCtx.Err())

ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
newCtx = NewContextWithCancelReason(ctx)
time.Sleep(50 * time.Millisecond)
cancel()
test.Assert(t, newCtx.Err() == context.DeadlineExceeded, newCtx.Err())

ctx, cancelCause := context.WithCancelCause(context.Background())
newCtx = NewContextWithCancelReason(ctx)
err := errors.New("test")
cancelCause(err)
test.Assert(t, newCtx.Err() == err, newCtx.Err())

ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancelCause = context.WithCancelCause(ctx)
newCtx = NewContextWithCancelReason(ctx)
cancelCause(err)
cancel()
test.Assert(t, newCtx.Err() == err, newCtx.Err())
}
43 changes: 0 additions & 43 deletions pkg/remote/trans/nphttp2/grpc/context_test.go

This file was deleted.

63 changes: 18 additions & 45 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,8 @@ import (
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/peer"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/utils"
)

// ticker is used to manage closeStreamTask.
// it triggers and cleans up actively cancelled streams every 5s.
// Streaming QPS is generally not too high, if there is a requirement for timeliness, then consider making it configurable.
// To reduce the overhead of goroutines in a multi-connection scenario, use the Sync SharedTicker
var ticker = utils.NewSyncSharedTicker(5 * time.Second)

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
Expand Down Expand Up @@ -213,13 +206,6 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
}
}
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
task := &closeStreamTask{t: t}
t.onClose = func() {
onClose()
// when http2Client has been closed, remove this task
ticker.Delete(task)
}
ticker.Add(task)

// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
Expand Down Expand Up @@ -292,34 +278,6 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
return t, nil
}

// closeStreamTask is used to clean up streams that have been actively cancelled by users
type closeStreamTask struct {
t *http2Client
toCloseStreams []*Stream
}

func (task *closeStreamTask) Tick() {
trans := task.t
trans.mu.Lock()
for _, stream := range trans.activeStreams {
select {
// judge whether stream has been canceled
case <-stream.Context().Done():
task.toCloseStreams = append(task.toCloseStreams, stream)
default:
}
}
trans.mu.Unlock()

for i, stream := range task.toCloseStreams {
// uniformly converted to status error
sErr := ContextErr(stream.Context().Err())
trans.closeStream(stream, sErr, true, http2.ErrCodeCancel, status.Convert(sErr), nil, false)
task.toCloseStreams[i] = nil
}
task.toCloseStreams = task.toCloseStreams[:0]
}

type clientTransportDump struct {
LocalAddress string `json:"local_address"`
State transportState `json:"transport_state"`
Expand Down Expand Up @@ -567,12 +525,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
return true
}
stop := context.AfterFunc(s.ctx, func() {
sErr := ContextErr(s.ctx.Err())
t.closeStream(s, sErr, true, http2.ErrCodeCancel, status.Convert(sErr), nil, false)
})
s.ctxCleanUp = stop
defer func() {
// If exiting abnormally, execute stop to prevent leak
if err != nil {
stop()
}
}()
for {
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
success, eErr := t.controlBuf.executeAndPut(func(it interface{}) bool {
return checkForHeaderListSize(it) && checkForStreamQuota(it)
}, hdr)
if err != nil {
return nil, err
if eErr != nil {
return nil, eErr
}
if success {
break
Expand Down Expand Up @@ -677,6 +646,10 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write.
close(s.done)
// invoke stop func of ctx.AfterFunc to avoid leak
if s.ctxCleanUp != nil {
s.ctxCleanUp()
}
}

// Close kicks off the shutdown process of the transport. This should be called
Expand Down
17 changes: 14 additions & 3 deletions pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"golang.org/x/net/http2/hpack"
"google.golang.org/protobuf/proto"

internal_stream "github.com/cloudwego/kitex/internal/stream"
"github.com/cloudwego/kitex/pkg/remote/codec/protobuf/encoding"
"github.com/cloudwego/kitex/pkg/remote/transmeta"

Expand Down Expand Up @@ -343,13 +344,23 @@ func (t *http2Server) operateHeaders(frame *grpcframe.MetaHeadersFrame, handle f
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
var cancel context.CancelFunc

if state.data.timeoutSet {
var cancel context.CancelFunc
var cancelCause context.CancelCauseFunc
s.ctx, cancel = context.WithTimeout(t.ctx, state.data.timeout)
s.ctx, cancelCause = context.WithCancelCause(s.ctx)
s.cancel = func(cause error) {
cancelCause(cause)
cancel()
}
} else {
s.ctx, cancel = context.WithCancel(t.ctx)
s.ctx, s.cancel = context.WithCancelCause(t.ctx)
}
s.ctx, s.cancel = newContextWithCancelReason(s.ctx, cancel)
// The semantics of `ctx.Err()` have changed. It now returns gRPC internal status error.
// If we just use context.WithCancelCause, users must use context.Cause to retrieve the previous error. This results a breaking change.
// Therefore, we need to encapsulate a Context that automatically executes `context.Cause` when `ctx.Err` is called.
s.ctx = internal_stream.NewContextWithCancelReason(s.ctx)
// Attach the received metadata to the context.
if len(state.data.mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
Expand Down
3 changes: 3 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -622,6 +623,7 @@ func Test_trace(t *testing.T) {
req := []byte("hello")
err = cli.Write(s, nil, req, &Options{})
test.Assert(t, err == nil, err)
time.Sleep(50 * time.Millisecond)
cancelFunc()

<-srv.srvReady
Expand All @@ -643,6 +645,7 @@ func Test_trace(t *testing.T) {
test.Assert(t, err == nil, err)
err = cli.Write(s, nil, nil, &Options{Last: true})
test.Assert(t, err == nil, err)
time.Sleep(50 * time.Millisecond)
cancelFunc()

<-srv.srvReady
Expand Down
20 changes: 11 additions & 9 deletions pkg/remote/trans/nphttp2/grpc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sync"
"sync/atomic"

internal_stream "github.com/cloudwego/kitex/internal/stream"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
Expand Down Expand Up @@ -234,13 +235,14 @@ const (
// Stream represents an RPC in the transport layer.
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel cancelWithReason // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
st ServerTransport // nil for client side Stream
ct *http2Client // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelCauseFunc
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
ctxCleanUp func() bool // the stop func of context.AfterFunc, nil for server side Stream
method string // the associated RPC method of the stream
recvCompress string
sendCompress string
buf *recvBuffer
Expand Down Expand Up @@ -555,8 +557,8 @@ func CreateStream(ctx context.Context, id uint32, requestRead func(i int), metho
hdrMu: sync.Mutex{},
}

ctx, cancel := context.WithCancel(ctx)
stream.ctx, stream.cancel = newContextWithCancelReason(ctx, cancel)
stream.ctx, stream.cancel = context.WithCancelCause(ctx)
stream.ctx = internal_stream.NewContextWithCancelReason(stream.ctx)
return stream
}

Expand Down
Loading
Loading