1414
1515import NIO
1616
17- public enum RedisInboundError : Error {
18- case UnexpectedStartByte( char: UInt8 , buffer: ByteBuffer )
19- case UnexpectedEndByte ( char: UInt8 , buffer: ByteBuffer )
20- case TransportError( Swift . Error )
21- case ProtocolError
22- case UnexpectedNegativeCount
23- case InternalInconsistency
24- }
25-
2617final class RedisChannelHandler : ChannelInboundHandler ,
2718 ChannelOutboundHandler
2819{
2920
30- typealias InboundErr = RedisInboundError
21+ typealias InboundErr = RESPParserError
3122
3223 typealias InboundIn = ByteBuffer
3324 typealias InboundOut = RESPValue
@@ -38,6 +29,7 @@ final class RedisChannelHandler : ChannelInboundHandler,
3829 let nilStringBuffer = ConstantBuffers . nilStringBuffer
3930 let nilArrayBuffer = ConstantBuffers . nilArrayBuffer
4031
32+ private final var parser = RESPParser ( )
4133
4234 // MARK: - Channel Open/Close
4335
@@ -59,259 +51,18 @@ final class RedisChannelHandler : ChannelInboundHandler,
5951
6052 // MARK: - Reading
6153
62- @inline ( __always)
63- private func decoded( value: RESPValue , in ctx: ChannelHandlerContext ) {
64- if let arrayContext = arrayContext {
65- _ = arrayContext. append ( value: value)
66-
67- if arrayContext. isDone {
68- let v = RESPValue . array ( arrayContext. values)
69-
70- if let parent = arrayContext. parent {
71- self . arrayContext = parent
72- }
73- else {
74- self . arrayContext = nil
75- if cachedParseContext == nil {
76- cachedParseContext = arrayContext
77- arrayContext. values = ContiguousArray ( )
78- arrayContext. values. reserveCapacity ( 16 )
79- }
80- }
81- decoded ( value: v, in: ctx)
82- }
83- }
84- else {
85- ctx. fireChannelRead ( self . wrapInboundOut ( value) )
86- }
87- }
88-
8954 public func channelRead( ctx: ChannelHandlerContext , data: NIOAny ) {
9055 do {
9156 let buffer = self . unwrapInboundIn ( data)
92-
93- try buffer. withUnsafeReadableBytes { bp in
94- let count = bp. count
95- var i = 0
96-
97- @inline ( __always)
98- func doSkipNL( ) {
99- if i >= count {
100- overflowSkipNL = true
101- }
102- else {
103- if bp [ i] == 10 /* LF */ { i += 1 }
104- overflowSkipNL = false
105- }
106- }
107-
108- if overflowSkipNL { doSkipNL ( ) }
109-
110- while i < count {
111- let c = bp [ i] ; i += 1
112-
113- switch state {
114-
115- case . protocolError:
116- throw InboundErr . ProtocolError
117-
118- case . start:
119- switch c {
120- case 43 /* + */: state = . simpleString
121- case 45 /* - */: state = . error
122- case 58 /* : */: state = . integer
123- case 36 /* $ */: state = . bulkStringLen
124- case 42 /* * */: state = . arrayCount
125- default : state = . telnet
126- }
127- countValue = 0
128- if state == . telnet || state == . simpleString || state == . error {
129- overflowBuffer = ctx. channel. allocator. buffer ( capacity: 80 )
130- overflowBuffer? . write ( integer: c)
131- }
132- else {
133- overflowBuffer = nil
134- }
135-
136- case . telnet:
137- assert ( overflowBuffer != nil , " missing overflow buffer " )
138- if c == 13 || c == 10 {
139- if c == 13 { doSkipNL ( ) }
140- let count = overflowBuffer? . readableBytes ?? 0
141- if count > 0 {
142- // just a quick hack for telnet mode
143- guard let s = overflowBuffer? . readString ( length: count) else {
144- throw InboundErr . ProtocolError
145- }
146- let vals = s. components ( separatedBy: " " )
147- . lazy. map { RESPValue ( bulkString: $0) }
148- decoded ( value: . array( ContiguousArray ( vals) ) , in: ctx)
149- }
150- }
151- else {
152- overflowBuffer? . write ( integer: c)
153- }
154-
155- case . arrayCount, . bulkStringLen, . integer:
156- let c0 : UInt8 = 48 , c9 : UInt8 = 57 , cMinus : UInt8 = 45
157- if c >= c0 && c <= c9 {
158- let digit = c - c0
159- countValue = ( countValue * 10 ) + Int( digit)
160- }
161- else if !hadMinus && c == cMinus && countValue == 0 {
162- hadMinus = true
163- }
164- else if c == 13 || c == 10 {
165- let doNegate = hadMinus
166- hadMinus = false
167- if c == 13 { doSkipNL ( ) }
168-
169- switch state {
170-
171- case . arrayCount:
172- if doNegate {
173- guard countValue == 1 else {
174- self . state = . protocolError
175- throw InboundErr . UnexpectedNegativeCount
176- }
177- decoded ( value: . array( nil ) , in: ctx)
178- }
179- else {
180- if countValue > 0 {
181- arrayContext =
182- makeArrayParseContext ( arrayContext, countValue)
183- }
184- else {
185- // push an empty array
186- decoded ( value: . array( [ ] ) , in: ctx)
187- }
188- }
189- state = . start
190-
191- case . bulkStringLen:
192- if doNegate {
193- state = . start
194- decoded ( value: . bulkString( nil ) , in: ctx)
195- }
196- else {
197- if ( count - i) >= ( countValue + 2 ) { // include CRLF
198- let value = buffer. getSlice ( at: buffer. readerIndex + i,
199- length: countValue) !
200- i += countValue
201- decoded ( value: . bulkString( value) , in: ctx)
202-
203- let ec = bp [ i]
204- guard ec == 13 || ec == 10 else {
205- self . state = . protocolError
206- throw InboundErr . UnexpectedStartByte ( char: bp [ i] ,
207- buffer: buffer)
208- }
209- i += 1
210- if ec == 13 { doSkipNL ( ) }
211-
212- state = . start
213- }
214- else {
215- state = . bulkStringValue
216- overflowBuffer =
217- ctx. channel. allocator. buffer ( capacity: countValue + 1 )
218- }
219- }
220-
221- case . integer:
222- let value = doNegate ? - countValue : countValue
223- countValue = 0 // reset
224-
225- decoded ( value: . integer( value) , in: ctx)
226- state = . start
227-
228- default :
229- assertionFailure ( " unexpected enum case \( state) " )
230- state = . protocolError
231- throw InboundErr . InternalInconsistency
232- }
233- }
234- else {
235- self . state = . protocolError
236- throw InboundErr . UnexpectedStartByte ( char: c, buffer: buffer)
237- }
238-
239- case . bulkStringValue:
240- let pending = countValue - ( overflowBuffer? . readableBytes ?? 0 )
241-
242- if pending > 0 {
243- overflowBuffer? . write ( integer: c)
244- let stillPending = pending - 1
245- let avail = min ( stillPending, ( count - i) )
246- if avail > 0 {
247- overflowBuffer? . write ( bytes: bp [ i..< ( i + avail) ] )
248- i += avail
249- }
250- }
251- else if pending == 0 && ( c == 13 || c == 10 ) {
252- if c == 13 { doSkipNL ( ) }
253-
254- let value = overflowBuffer
255- overflowBuffer = nil
256-
257- decoded ( value: . bulkString( value) , in: ctx)
258- state = . start
259- }
260- else {
261- self . state = . protocolError
262- throw InboundErr . UnexpectedEndByte ( char: c, buffer: buffer)
263- }
264-
265- case . simpleString, . error:
266- assert ( overflowBuffer != nil , " missing overflow buffer " )
267- if c == 13 || c == 10 {
268- if c == 13 { doSkipNL ( ) }
269-
270- if state == . simpleString {
271- if let v = overflowBuffer {
272- decoded ( value: . simpleString( v) , in: ctx)
273- }
274- }
275- else {
276- // TODO: make nice :-)
277- let avail = overflowBuffer? . readableBytes ?? 0
278- let value = overflowBuffer? . readBytes ( length: avail) ?? [ ]
279- let pair = value. split ( separator: 32 , maxSplits: 1 )
280- let code = pair. count > 0 ? String . decode ( utf8: pair [ 0 ] ) ?? " " : " "
281- let msg = pair. count > 1 ? String . decode ( utf8: pair [ 1 ] ) ?? " " : " "
282- let error = RESPError ( code: code, message: msg)
283- decoded ( value: . error( error) , in: ctx)
284- }
285- overflowBuffer = nil
286-
287- state = . start
288- }
289- else {
290- overflowBuffer? . write ( integer: c)
291- }
292- }
293- }
57+ try parser. feed ( buffer) { respValue in
58+ ctx. fireChannelRead ( self . wrapInboundOut ( respValue) )
29459 }
29560 }
29661 catch {
29762 ctx. fireErrorCaught ( error)
29863 ctx. close ( promise: nil )
29964 return
30065 }
301-
302-
303- // finish up.
304-
305- if let arrayContext = arrayContext {
306- if arrayContext. isDone {
307- let values = arrayContext. values
308- self . arrayContext = nil
309- ctx. fireChannelRead ( self . wrapInboundOut ( . array( values) ) )
310- }
311- else {
312- // we leave the context around
313- }
314- }
31566 }
31667
31768 public func errorCaught( ctx: ChannelHandlerContext , error: Error ) {
0 commit comments