-
Notifications
You must be signed in to change notification settings - Fork 7.6k
1.x: concat reduce overhead when streaming a source #3589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👍 |
arbiter.produced(c); | ||
AtomicLong requestedField = requested; | ||
if (requestedField.get() != Long.MAX_VALUE) { | ||
requestedField.addAndGet(-c); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call getAndAddRequest
here? In case requested
is changed to Long.MAX_VALUE
at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In normal use, such change never happens; consumers either start with MAX_VALUE or some prefetch value, plus you have to be very precise to hit the narrow window between the two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to see correctness. Speculating on the probability of a race
condition seems like dangerous territory to be entering.
On Sun, 31 Jan 2016 21:24 David Karnok [email protected] wrote:
In src/main/java/rx/internal/operators/OperatorConcat.java
#3589 (comment):@@ -179,6 +187,14 @@ void subscribeNext() {
}
}
}
+
void produced(long c) {
arbiter.produced(c);
AtomicLong requestedField = requested;
if (requestedField.get() != Long.MAX_VALUE) {
requestedField.addAndGet(-c);
In normal use, such change never happens; consumers either start with
MAX_VALUE or some prefetch value, plus you have to be very precise to hit
the narrow window between the two lines.—
Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/pull/3589/files#r51358378.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is correct as it is, what we are talking about is considering the case where one wants to subvert the optimization by racing with get().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've a very narrow focus on this method so I could well be missing something but normally once requested
is set to Long.MAX_VALUE we don't reduce its value and @zsxwing has indicated this possibility. I don't think it should be best endeavours as in I would expect to see a CAS loop here. Is this a bit fussy? Do we do this in lots of places already? (I thought a lot had been cleaned up).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The keyword is loop which can lenghten due to failed CAS in bounded mode or safepoint kicking in. AddAndGet is a one step atomic operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So no other place will update requested
at the same time. Right?
e54f4d8
to
d4367d4
Compare
There is a non-zero chance there, but it is very small. If it happens, all the effect it will have is to make the operator decrement the requested amount from then on, which adds ~40 cycle overhead after each source completes. Updated the PR anyway to do a CAS loop on production to remove all worries about the case. |
d4367d4
to
bf671e3
Compare
👍 |
@zsxwing, @davidmoten are you satisfied with the name |
long previous; | ||
|
||
if (requestedField.get() != Long.MAX_VALUE) { | ||
previous = BackpressureUtils.getAndAddRequest(requestedField, n); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This path involves two AtomicLong.get()
. Can't the whole block just be replaced by
long previous = BackpressureUtils.getAndAddRequest(requestedField, n);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the getAndAddRequest
doesn't check for max-value and always does the CAS. I'd leave this as is and have a separate PR where the getAndAddRequest
is changed and all use places are verified/fixed.
Just a few comments, otherwise 👍. |
bf671e3
to
234a4c4
Compare
Changed to AtomicBoolean and moved c != 0 into parent because the parent was used for completing the inner anyway, I overlooked it at first. |
👍 |
1.x: concat reduce overhead when streaming a source
This PR reduces the request tracking overhead of
concat
by tracking the produced item count in a plain field and subtracting it from the arbiter and requested values only before the inner source completes. So instead of N decrementAndGet call, we have 1 addAndGet(-N) per source.I've added a perf class to measure the difference.
(Intel Celeron 1005M @ 2GHz, Windows 7 x64, Java 8u66)
The throughput increased considerably, although I would have expected more, especially in the 1M case where the subscription overhead doesn't matter.
I'll do further investigation on it and post a follow-up PR if this gets merged in the meantime.