-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathConnectionPoolProvider.cs
More file actions
156 lines (142 loc) · 8.13 KB
/
ConnectionPoolProvider.cs
File metadata and controls
156 lines (142 loc) · 8.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
* Copyright 2018 Stanislav Muhametsin. All rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using CBAM.Abstractions.Implementation.NetworkStream;
using IOUtils.Network.Configuration;
using ResourcePooling.Async.Abstractions;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using UtilPack;
namespace CBAM.NATS.Implementation
{
using TIntermediateState = ValueTuple<ClientProtocol.ReadState, Reference<ServerInformation>, CancellationToken, Stream>;
public sealed class NATSConnectionPoolProvider : AbstractAsyncResourceFactoryProvider<NATSConnection, NATSConnectionCreationInfo>
{
public static AsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo> Factory { get; } = new DefaultAsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo>( config =>
config.NewFactoryParametrizer<NATSConnectionCreationInfo, NATSConnectionCreationInfoData, NATSConnectionConfiguration, NATSInitializationConfiguration, NATSProtocolConfiguration, NATSPoolingConfiguration>()
.BindPublicConnectionType<NATSConnection>()
.CreateStatefulDelegatingConnectionFactory(
Encoding.ASCII.CreateDefaultEncodingInfo(),
( parameters, encodingInfo, stringPool, stringPoolIsDedicated, socketOrNull, stream, token ) => new TIntermediateState( new ClientProtocol.ReadState(), new Reference<ServerInformation>(), token, stream ),
async ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, state ) =>
{
// First, read the INFO message from server
var rState = state.Item1;
var buffer = rState.Buffer;
var aState = rState.BufferAdvanceState;
await state.Item4.ReadUntilMaybeAsync( buffer, aState, ClientProtocolConsts.CRLF, ClientProtocolConsts.READ_COUNT );
var array = buffer.Array;
var idx = 0;
var end = aState.BufferOffset;
if ( end < 7
|| ( array.ReadInt32BEFromBytes( ref idx ) & ClientProtocolConsts.UPPERCASE_MASK_FULL ) != ClientProtocolConsts.INFO_INT
|| ( array[idx] != ClientProtocolConsts.SPACE
&& array[idx] != ClientProtocolConsts.TAB )
)
{
throw new NATSException( "Invalid INFO message at startup." );
}
++idx;
var serverInfo = ClientProtocol.DeserializeInfoMessage( array, idx, end - idx, NATSAuthenticationConfiguration.PasswordByteEncoding );
state.Item2.Value = serverInfo;
var sslMode = parameters.CreationData.Connection?.ConnectionSSLMode ?? ConnectionSSLMode.NotRequired;
var serverNeedsSSL = serverInfo.SSLRequired;
if ( serverNeedsSSL && sslMode == ConnectionSSLMode.NotRequired )
{
throw new NATSException( "Server requires SSL, but client does not." );
}
else if ( !serverNeedsSSL && sslMode == ConnectionSSLMode.Required )
{
throw new NATSException( "Client requires SSL, but server does not." );
}
else if ( serverInfo.AuthenticationRequired )
{
throw new NotImplementedException();
}
// We should not receive anything else except info message at start, but let's just make sure we leave anything extra still to be visible to client protocol
ClientProtocol.SetPreReadLength( rState );
return serverNeedsSSL;
},
() => new NATSException( "Server accepted SSL request, but the creation parameters did not have callback to create SSL stream" ),
() => new NATSException( "Server does not support SSL." ),
() => new NATSException( "SSL stream creation callback returned null." ),
() => new NATSException( "Authentication callback given by SSL stream creation callback was null." ),
inner => new NATSException( "Unable to start SSL client.", inner ),
async ( parameters, encodingInfo, stringPool, stringPoolIsDedicated, stream, socketOrNull, token, state ) =>
{
var paramData = parameters.CreationData;
var initConfig = paramData.Initialization;
var protoConfig = initConfig?.Protocol ?? new NATSProtocolConfiguration();
var authConfig = initConfig?.Authentication;
var wState = new ClientProtocol.WriteState();
var serverInfo = state.Item2.Value;
await ClientProtocol.InitializeNewConnection( new ClientInformation()
{
IsVerbose = protoConfig.Verbose,
IsPedantic = protoConfig.Pedantic,
SSLRequired = serverInfo.SSLRequired,
AuthenticationToken = authConfig?.AuthenticationToken,
Username = authConfig?.Username,
Password = authConfig?.Password,
ClientName = protoConfig.ClientName,
ClientLanguage = protoConfig.ClientLanguage,
ClientVersion = protoConfig.ClientVersion
}, NATSAuthenticationConfiguration.PasswordByteEncoding, wState, stream, token );
return new ClientProtocolPoolInfo( new ClientProtocol( new ClientProtocol.ClientProtocolIOState(
new DuplexBufferedAsyncStream( stream, Math.Max( NATSProtocolConfiguration.DEFAULT_BUFFER_SIZE, protoConfig.StreamBufferSize ) ),
stringPool,
encodingInfo,
wState,
state.Item1
), serverInfo ) );
},
protocol => new ValueTask<NATSConnectionImpl>( new NATSConnectionImpl( NATSConnectionVendorFunctionalityImpl.Instance, protocol.Protocol ) ),
( protocol, connection ) => new CBAM.Abstractions.Implementation.StatelessConnectionAcquireInfo<NATSConnectionImpl, ClientProtocolPoolInfo, Stream>( connection, protocol, protocol.Protocol.Stream ),
( functionality, connection, token, error ) => functionality.Protocol?.Stream
) );
public NATSConnectionPoolProvider()
: base( typeof( NATSConnectionCreationInfoData ) )
{
}
protected override AsyncResourceFactory<NATSConnection, NATSConnectionCreationInfo> CreateFactory()
{
return Factory;
}
protected override NATSConnectionCreationInfo TransformFactoryParameters( Object creationParameters )
{
ArgumentValidator.ValidateNotNull( nameof( creationParameters ), creationParameters );
NATSConnectionCreationInfo retVal;
if ( creationParameters is NATSConnectionCreationInfoData creationData )
{
retVal = new NATSConnectionCreationInfo( creationData );
}
else if ( creationParameters is NATSConnectionCreationInfo creationInfo )
{
retVal = creationInfo;
}
else
{
throw new ArgumentException( $"The {nameof( creationParameters )} must be instance of {typeof( NATSConnectionCreationInfoData ).FullName}." );
}
return retVal;
}
}
}