服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Spring Cloud Gateway 内存溢出的解决方案

Spring Cloud Gateway 内存溢出的解决方案

2021-10-13 15:10神殇彡 Java教程

这篇文章主要介绍了Spring Cloud Gateway 内存溢出的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

记 Spring Cloud Gateway 内存溢出查询过程

环境配置:

  • org.springframework.boot : 2.1.4.RELEASE
  • org.springframework.cloud :Greenwich.SR1

事故记录:

由于网关存在 RequestBody 丢失的情况,顾采用了网上的通用解决方案,使用如下方式解决:

?
1
2
3
4
5
6
7
8
9
10
11
@Bean
public RouteLocator tpauditRoutes(RouteLocatorBuilder builder) {
    return builder.routes().route("gateway-post", r -> r.order(1)
       .method(HttpMethod.POST)
       .and()
       .readBody(String.class, requestBody -> {return true;}) # 重点在这
       .and()
       .path("/gateway/**")
       .filters(f -> {f.stripPrefix(1);return f;})
       .uri("lb://APP-API")).build();
}

测试环境,Spring Cloud Gateway 网关功能编写完成。开始进行测试环境压测。

正常采用梯度压测方式,最高用户峰值设置为400并发。经历两轮时长10分钟左右压测,没有异常情况出现。

中午吃饭时间,设置了1个小时的时间进行测试。

回来的时候系统报出如下异常

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2019-08-12 15:06:07,296 1092208 [reactor-http-server-epoll-12] WARN  io.netty.channel.AbstractChannelHandlerContext.warn:146 - An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 503316487, max: 504889344)
 at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
 at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
 at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
 at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
 at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
 at io.netty.buffer.PoolArena.allocate(PoolArena.java:214)
 at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
 at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
 at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
 at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
 at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
 at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
 at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:72)
 at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:793)
 at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:382)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
 at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
 at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:315)
 at io.

当时一脸懵逼,马上开始监控 Jvm 堆栈,减少jvm的内存空间,提升并发数以后,重启项目重新压测,

项目启动参数如下:

?
1
2
3
java -jar -Xmx1024M /opt/deploy/gateway-appapi/cloud-employ-gateway-0.0.5-SNAPSHOT.jar
↓↓↓↓修改为↓↓↓↓
java -jar -Xmx512M /opt/deploy/gateway-appapi/cloud-employ-gateway-0.0.5-SNAPSHOT.jar

缩减了一半内存启动,等待问题复现。等待3分钟问题再次复现,但是同时Jvm却的进行了Full GC。

?
1
2
3
     EC       EU        OC         OU       MC     MU    CCSC   CCSU   YGC     YGCT    FGC    FGCT  
275456.0 100103.0  484864.0   50280.2  67672.0 64001.3 9088.0 8463.2    501   11.945   3      0.262
275968.0 25072.3   484864.0   47329.3  67672.0 63959.4 9088.0 8448.8    502   11.970   4      0.429

没错,在出现问题的时候,系统出现了Full Gc,但是OU并没有达到触发的原因。

结合日志中的 direct memory,想到了Jvm 中的堆外内存。

使用 -XX:MaxDirectMemorySize 可以进行设置 Jvm 堆外内存大小,当 Direct ByteBuffer 分配的堆外内存到达指定大小后,即触发Full GC。

该值是有上限的,默认是64M,最大为 sun.misc.VM.maxDirectMemory()。

结合所有情况,表明堆外内存使用存在内存溢出的情况。

报错内容为Netty框架,新增以下配置,开启Netty错误日志打印:

?
1
2
-Dio.netty.leakDetection.targetRecords=40 #设置Records 上限
-Dio.netty.leakDetection.level=advanced   #设置日志级别

项目启动,没任何问题,开启压测后服务报出如下异常:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
2019-08-13 14:59:01,656 18047 [reactor-http-nio-7] ERROR io.netty.util.ResourceLeakDetector.reportTracedLeak:317 - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
    org.springframework.core.io.buffer.NettyDataBuffer.release(NettyDataBuffer.java:301)
    org.springframework.core.io.buffer.DataBufferUtils.release(DataBufferUtils.java:420)
    org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:208)
    org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)
    org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
    reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
    reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:101)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90)
    reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)
    reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
    reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
    reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)
    reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)
    reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)
    reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
    reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)
    reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
    reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
    reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)
    reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    java.lang.Thread.run(Unknown Source)
#2:
    io.netty.buffer.AdvancedLeakAwareByteBuf.nioBuffer(AdvancedLeakAwareByteBuf.java:712)
    org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:266)
    org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:207)
    org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)
    org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
    reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
    reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:101)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90)
    reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)
    reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
    reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
    reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)
    reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)
    reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)
    reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
    reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)
    reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
    reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
    reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)
    reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    java.lang.Thread.run(Unknown Source)
#3:
    io.netty.buffer.AdvancedLeakAwareByteBuf.slice(AdvancedLeakAwareByteBuf.java:82)
    org.springframework.core.io.buffer.NettyDataBuffer.slice(NettyDataBuffer.java:260)
    org.springframework.core.io.buffer.NettyDataBuffer.slice(NettyDataBuffer.java:42)
    org.springframework.cloud.gateway.handler.predicate.ReadBodyPredicateFactory.lambda$null$0(ReadBodyPredicateFactory.java:102)
    reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46)
    reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
    reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)
    reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)
    reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)
    reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
    reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)
    reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
    reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
    reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
    reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
    reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)
    reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    java.lang.Thread.run(Unknown Source)

在 #3 中,我发现了一个眼熟的类,ReadBodyPredicateFactory.java ,还记得最开始的时候使用 readbody 配置么?

这里就是进行 cachedRequestBodyObject 的写入类,

追踪一下Readbody源码

?
1
2
3
4
5
6
7
8
9
10
11
12
/**
 * This predicate is BETA and may be subject to change in a future release. A
 * predicate that checks the contents of the request body
 * @param inClass the class to parse the body to
 * @param predicate a predicate to check the contents of the body
 * @param <T> the type the body is parsed to
 * @return a {@link BooleanSpec} to be used to add logical operators
 */
public <T> BooleanSpec readBody(Class<T> inClass, Predicate<T> predicate) {
 return asyncPredicate(getBean(ReadBodyPredicateFactory.class)
   .applyAsync(c -> c.setPredicate(inClass, predicate)));
}

异步调用的 ReadBodyPredicateFactory.applyAsync() 和 错误日志中的

?
1
org.springframework.cloud.gateway.handler.predicate.ReadBodyPredicateFactory.lambda$null$0(ReadBodyPredicateFactory.java:102)

指向方法一致。查看源码102行:

?
1
2
3
Flux<DataBuffer> cachedFlux = Flux.defer(() ->
 Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount()))
);

此处 Spring Cloud Gateway 通过 dataBuffer.slice 切割出了新的 dataBuffer,但是通过 Netty 的内存检测工具判断,此处的 dataBuffer 并没有被回收。

错误如下,日志很多容易被忽视。

ERROR io.netty.util.ResourceLeakDetector.reportTracedLeak:317 - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.

找到问题那就要解决才行,尝试修改源码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
@SuppressWarnings("unchecked")
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
    return exchange -> {
        Class inClass = config.getInClass();
 
        Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
        Mono<?> modifiedBody;
        // We can only read the body from the request once, once that
        // happens if we
        // try to read the body again an exception will be thrown. The below
        // if/else
        // caches the body object as a request attribute in the
        // ServerWebExchange
        // so if this filter is run more than once (due to more than one
        // route
        // using it) we do not try to read the request body multiple times
        if (cachedBody != null) {
            try {
                boolean test = config.predicate.test(cachedBody);
                exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                return Mono.just(test);
            } catch (ClassCastException e) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Predicate test failed because class in predicate "
                            + "does not match the cached body object", e);
                }
            }
            return Mono.just(false);
        } else {
            // Join all the DataBuffers so we have a single DataBuffer for
            // the body
            return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
                // Update the retain counts so we can read the body twice,
                // once to parse into an object
                // that we can test the predicate against and a second time
                // when the HTTP client sends
                // the request downstream
                // Note: if we end up reading the body twice we will run
                // into
                // a problem, but as of right
                // now there is no good use case for doing this
                DataBufferUtils.retain(dataBuffer);
                // Make a slice for each read so each read has its own
                // read/write indexes
                Flux<DataBuffer> cachedFlux = Flux
                        .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
 
                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
                    @Override
                    public Flux<DataBuffer> getBody() {
                        return cachedFlux;
                    }
                };
                # 新增如下代码
                DataBufferUtils.release(dataBuffer);
 
                return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                        .bodyToMono(inClass).doOnNext(objectValue -> {
                            exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                            exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, cachedFlux);
                        }).map(objectValue -> config.predicate.test(objectValue));
            });
 
        }
    };
}

Spring Cloud Gateway 在配置的架构中,版本为2.1.1,修改以上代码后,启动项目测试,问题没有复现,正常运行。

同样这个问题,也可以选择升级 Spring Cloud Gateway 版本,在官方2.1.2版本中,此处代码已被重构,升级后测试也完全正常。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/live501837145/article/details/99446673

延伸 · 阅读

精彩推荐