Yingjie Cao created FLINK-11859:
----------------------------------- Summary: Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly Key: FLINK-11859 URL: https://issues.apache.org/jira/browse/FLINK-11859 Project: Flink Issue Type: Improvement Reporter: Yingjie Cao Assignee: Yingjie Cao In the current implementation of SpanningRecordSerializer, the length of a record is serialized to an intermediate length buffer and then copied to the target buffer. Actually, the length filed can be serialized directly to the data buffer (serializationBuffer), which can avoid the copy of length buffer. Though the total bytes copied remain unchanged, it one copy of a small record which incurs high overhead. The flink-benchmarks shows it can improve performance and the test results are as follows. Result with the optimization: |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend| |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | | |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | | |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms| |MEMORY| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms| |FS| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms| |FS_ASYNC| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms| |ROCKS| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms| |ROCKS_INC| |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms| | | |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms| | | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL| | |SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | | |WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | | |WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | | |WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | | |WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | | |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op| | | Result without the optimization: |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend| |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | | |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | | |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms| |MEMORY| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms| |FS| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms| |FS_ASYNC| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms| |ROCKS| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms| |ROCKS_INC| |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms| | | |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms| | | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL| | |SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | | |WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | | |WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | | |WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | | |WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | | |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op| | | The optimization is especially useful for small records. -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |