Skip to content

RxRingBuffer fixes and improvements#2333

Closed
akarnokd wants to merge 1 commit intoReactiveX:1.xfrom
akarnokd:RxRingBufferFix
Closed

RxRingBuffer fixes and improvements#2333
akarnokd wants to merge 1 commit intoReactiveX:1.xfrom
akarnokd:RxRingBufferFix

Conversation

@akarnokd
Copy link
Copy Markdown
Member

@akarnokd akarnokd commented Jan 6, 2015

This PR contains the fixes and improvements on the RxRingBuffer and its single-consumer-single-producer queue.

  • Added SWSRPhaser which is a variant of Gil Tene's WriterReaderPhaser that uses cheaper atomic operations because the single reader and single writer use case. Note that pre Java 8 Unsafe doesn't support atomic addAndGetLong operation. The simplified phaser costs only a single atomic increment per use.
  • Updated SpscArrayQueue to match JCTools' current version: the queue now can be fully utilized to its capacity.
  • The RxRingBuffer now uses two phasers: one for the offer side and one for the poll/peek side. The benefits: reduced interference between readers and writers; allows using the simplified phaser because each side is now single threaded (a shared phaser implies up to 2 threads at once).

Benchmark results:

Benchmark              (size)        1.x   |      this   
1SyncStreamOfN              1  3779678,748 | 3767936,028 
1SyncStreamOfN           1000    21250,675 |   18530,542 
1SyncStreamOfN        1000000       20,406 |      17,712 
NAsyncStreamsOfN            1   115390,116 |  115629,480 
NAsyncStreamsOfN         1000        2,579 |       2,546 
NSyncStreamsOf1             1  3543551,254 | 3602242,709 
NSyncStreamsOf1           100   299166,910 |  301703,721 
NSyncStreamsOf1          1000    28404,751 |   28420,833 
NSyncStreamsOfN             1  4054571,577 | 4003156,953 
NSyncStreamsOfN          1000       24,324 |      20,601 
TwoAsyncStreamsOfN          1    85846,727 |   85682,983 
TwoAsyncStreamsOfN       1000     1823,137 |    1889,458 
reamOfNthatMergesIn1        1  3724179,351 | 3725068,220 
reamOfNthatMergesIn1     1000    19051,928 |   19392,595 
reamOfNthatMergesIn1  1000000       18,265 |      18,069

@daschl
Copy link
Copy Markdown
Contributor

daschl commented Jan 7, 2015

@akarnokd is that ops/s? Why is it slower in some cases too?

edit: it seems that it's slower in most of the cases? if it is really ops/s

@akarnokd
Copy link
Copy Markdown
Member Author

akarnokd commented Jan 7, 2015

Yes, ops per second. These are the most noticeably slower ones:

1SyncStreamOfN           1000    21250,675 |   18530,542
1SyncStreamOfN        1000000       20,406 |      17,712
NSyncStreamsOfN          1000       24,324 |      20,601

I haven't dig into it but my guess is that the atomic increment-and-get on the peek() and poll(). Some queue users first call peek() and if it returns something, then they do a full poll(). This is 2 increments per value instead of none in 1.x. I've been thinking about the option to remove the phaser from peek() since it doesn't change any queue state. (Alternatively, the two might be merged into a pollIf(Predicate consume) where the callback would tell to remove the value or not.)

Edit: correction, the only place peek() is used is in zip tick, so it is one atomic increment per poll instead of 0.

@akarnokd akarnokd closed this Jan 21, 2015
@akarnokd akarnokd reopened this Jan 21, 2015
@zsxwing
Copy link
Copy Markdown
Member

zsxwing commented Jan 21, 2015

@akarnokd you can click "Restart build" in Travis CI.

@akarnokd
Copy link
Copy Markdown
Member Author

@zsxwing Thanks, I never logged into Travis so did not see the button.

@benjchristensen
Copy link
Copy Markdown
Member

@akarnokd Is this the one you think is ready for merge? If so I'll do my tests with Flight Recorder to appease my concern on memory and GC behavior.

@akarnokd
Copy link
Copy Markdown
Member Author

Yes it is.

@benjchristensen
Copy link
Copy Markdown
Member

Here is the outcome of my perf testing on this:

 ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'

Benchmark                                          (size)   Mode   Samples          1.x           PR
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5052690.265  4989944.158 
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    43229.405    34265.533 --
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       41.317       30.750 --
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5   101442.051    95688.061 
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.776        4.768
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4640498.106  4525502.795
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   446840.647   448934.827
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    39857.078    39870.866
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  4837426.647  5174591.967
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       39.182       29.531 --
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76236.959    76523.262 
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3073.535     2556.523 -
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4984161.207  4745798.100 
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    37588.942    35720.766
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.425       31.146

Flight Recorder testing against first minute of "Iteration" stage of this JMH run:

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 20 .*OperatorMergePerf.merge1SyncStreamOfN.*'

1.x Baseline Overview

baseline-overview

This PR Overview
pr-overview

1.x Baseline Allocations

baseline-allocations

This PR Allocations
pr-allocations

@benjchristensen
Copy link
Copy Markdown
Member

The CPU usage appears to be higher on this PR (the average is 19 versus 17). Not sure if there are other changes in 1.x that could affect this as this PR is 17 days old.

As per previous comments there are 3 perf tests that take a noticeable performance hit.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep these comments in here.

@benjchristensen
Copy link
Copy Markdown
Member

I suggest that we merge the JCTools SpscArrayQueue fixes in via another PR since we want those regardless of what else we do and so comparisons across approaches are equivalent.

I'm not yet ready to accept the performance hit this PR gives.

@benjchristensen
Copy link
Copy Markdown
Member

Here is comparing #2189 and this PR to 1.x. I rebased both PRs onto the current 1.x branch to try and be as accurate as possible:

1.x == current 1.x branch
PR2333 == PR #2333 after rebasing onto 1.x
PR2189 == PR #2189 after rebasing onto 1.x

Benchmark                                          (size)   Mode   Samples          1.x        PR2189        PR2333
r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4879535.663   4773501.735   4807008.476
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    43295.567     37341.214     34950.819
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       41.255        40.202        32.236
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    99885.768    101321.745     97689.264
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.846         4.715         4.973
r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4684222.432   4747380.010   4751592.996
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   456736.726    468609.567    467310.110
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    40504.652     41472.463     41146.594
r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  4993477.475   5268523.818   5414652.857
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       44.460        42.820        32.926
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    79546.448     76853.391     73846.697
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3138.932      3140.582      2672.720
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5248113.569   5136570.967   5225289.115
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    39001.895     39254.876     39235.506
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       35.039        35.282        33.375

@akarnokd akarnokd closed this Jan 28, 2015
@akarnokd akarnokd deleted the RxRingBufferFix branch February 2, 2015 11:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants