[jira] [Created] (FLINK-11859) Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11859) Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly

Shang Yuanchun (Jira)
Yingjie Cao created FLINK-11859:
-----------------------------------

             Summary: Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly
                 Key: FLINK-11859
                 URL: https://issues.apache.org/jira/browse/FLINK-11859
             Project: Flink
          Issue Type: Improvement
            Reporter: Yingjie Cao
            Assignee: Yingjie Cao


In the current implementation of SpanningRecordSerializer, the length of a record is serialized to an intermediate length buffer and then copied to the target buffer. Actually, the length filed can be serialized directly to the data buffer (serializationBuffer), which can avoid the copy of length buffer. Though the total bytes copied remain unchanged, it one copy of a small record which incurs high overhead. The flink-benchmarks shows it can improve performance and the test results are as follows.

Result with the optimization:
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms| |MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms| |FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms| |FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms| |ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms| |ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms| | |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms| | |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL| |
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op| | |

Result without the optimization:

 
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms| |MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms| |FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms| |FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms| |ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms| |ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms| | |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms| | |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms| | |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms| |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL| |
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op| | |
 
The optimization is especially useful for small records.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)