Skip to content

Conversation

mkarg
Copy link
Member

@mkarg mkarg commented Feb 15, 2025

According to https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream any line within an SSE Event MUST NOT contain any of the characters \n, \r nor the combination \r\n.

This PR also contains performance improvements, so the corrected code is in no case slower than the incorrect code, but even outperforms the original code in many cases. It replaces #5832.

@mkarg mkarg mentioned this pull request Feb 15, 2025
@mkarg
Copy link
Member Author

mkarg commented Feb 16, 2025

@jansupol This PR fails because of incorrect Copyright check in Jersey's POM.xml. According to Eclipse Foundation's rules, all projects MUST accept the short form having only the initial publication date (see https://www.eclipse.org/projects/handbook/#ip-copyright-headers). Apparently Jersey's POM.xml expects to find the latest date, which is wrong. What is your decision how to proceed?

@jansupol
Copy link
Contributor

@mkarg In Jersey project, we follow the advice of Oracle legal department to contain the copyright year with the last year of a change. This is enforced by the glassfish copyright plugin created for that purpose. Do you have a hard time to increase the copyright year in the changed files?

@mkarg
Copy link
Member Author

mkarg commented Feb 19, 2025

@mkarg In Jersey project, we follow the advice of Oracle legal department to contain the copyright year with the last year of a change. This is enforced by the glassfish copyright plugin created for that purpose. Do you have a hard time to increase the copyright year in the changed files?

You mean, besides me being an Eclipse Committer Member bound solely to Eclipse Foundation rules, not employed with Oracle, not bound to Oracle-internal rules?

The EF is pretty clear here:

Do we need to specify a range of years in the copyright statement?
No. In the past, legal advice was that the year of the initial creation of the content and the year of the last change should be reflected in the copyright header. This is no longer the case. Specify the year that the content was initially created in the copyright statement.

@mkarg
Copy link
Member Author

mkarg commented Feb 21, 2025

@jansupol FYI: Fixed Copyright according Oracle rules.

@mkarg
Copy link
Member Author

mkarg commented Feb 23, 2025

Apparently you did not find the time to review / merge this PR, so I used the time to author a commit ontop with a unit test for DataLeadStream. If I didn't miss something, it should contain all possible edge cases around EOL handling (at the start, at the end, mixing of write(int) and write(char[]) etc. Maybe it is beneficial for the review, it actual was very beneficial when authoring performance improvements of SSE (to be found eventually in separate PRs once this one got merged). 😃

@mkarg
Copy link
Member Author

mkarg commented Feb 24, 2025

@jansupol Anything more needed to review / merge this bug fix? 🤔

@jansupol
Copy link
Contributor

This is what I did: I created a brief test as follows:

    private static final class DataLeadStream2 extends OutputStream { //Current PR DataLeadStream
        private final OutputStream entityStream;

        private int lastChar = -1;

        DataLeadStream2(final OutputStream entityStream) {
            this.entityStream = entityStream;
        }

        @Override
        public void write(final int i) throws IOException {
            if (lastChar == -1) {
                entityStream.write(DATA_LEAD);
            } else if (lastChar != '\n' && lastChar != '\r') {
                entityStream.write(lastChar);
            } else if (lastChar == '\n' || lastChar == '\r' && i != '\n') {
                entityStream.write(EOL);
                entityStream.write(DATA_LEAD);
            }

            lastChar = i;
        }

        void finish() throws IOException {
            if (lastChar != -1) {
                write(-1);
            }
        }
    }

    private static final class DataLeadStream1 extends OutputStream { // Previous PR DataLeadStream
        private final OutputStream entityStream;
        private int lastChar = '\n';

        DataLeadStream1(final OutputStream entityStream) {
            this.entityStream = entityStream;
        }

        @Override
        public final void write(final int i) throws IOException {
            if (lastChar == '\n') {
                entityStream.write(DATA_LEAD);
            }
            entityStream.write(i);
            lastChar = i;
        }
    }

    private static final class DataLeadStream0 extends OutputStream { // Original class DataLeadStream
        private final OutputStream entityStream;
        private boolean start = true;

        private DataLeadStream0(OutputStream entityStream) {
            this.entityStream = entityStream;
        }

        @Override
        public void write(final int i) throws IOException {
            if (start) {
                entityStream.write(DATA_LEAD);
                start = false;
            }
            entityStream.write(i);
            if (i == '\n') {
                entityStream.write(DATA_LEAD);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        int SIZE = 100000;
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i != 200 * SIZE; i++) {
            sb.append("0123456789");
        }

        OutputStream voidOS = new OutputStream() {
            @Override
            public void write(int b) throws IOException {
                //Ignore
            }
        };

        long time = System.currentTimeMillis();
        DataLeadStream2 dls = new DataLeadStream2(voidOS); //Try various DataLeadStreams
        for (int i = 0; i != 1000000 / SIZE; i++) {
            dls.write(sb.toString().getBytes());
            dls.finish();                                                                     //This slows DataLeadStream2 down by about 50%
        }
        System.out.println(System.currentTimeMillis() - time);
    }

The OutputStream performance behaves differently for SIZE= 100 & SIZE=10000. The OutputStream0 is better for short messages, OutputStream1 for large messages (SIZE > 10000), but OutputStream2 is now the slowest.

What exactly is the purpose of this change? The original PR mentioned performance, this mentions the \r data in the message, but the real reason to me was the empty message at the end. Can you provide a use-case which justifies the change in SSE? Thanks.

@mkarg
Copy link
Member Author

mkarg commented Feb 26, 2025

TL;DR: The purpose of this PR is not performance but correctness solely, w.r.t to what is told in the PR's description (this PR is just a bug fix). Performance will get recovered by a subsequent PR.

Explanation: The original PR you mention had the intention of improving performance, but we both agreed that it fails because it fixes one bug but opens another bug, plus there since ever was already a bug with \r\n not getting detected as one single EOL. Hence, I have separated work as announced: First, this PR guarantees 100% correct syntax handling of all combinations of \r, \n, \r\n etc. Second, once this PR is merged, there will be another PR ontop which improves performance (by implementing write(char[], int, int). The original PR you mentioned will be superseded by that one. After both new PRs got merged, the status will be that:

  • Handling of `\r\n´ is 100% correct then (thanks to this PR).
  • Performance will be much better as strings then get sent using much less OutputStream.write calls compared to the status quo (which does one such call per char) - in the best case, a single write per String (thanks to subsequent PR); in the worst case, it performs as the status quo.

@jansupol
Copy link
Contributor

While I agree that the current state does not work exactly as the SSE standard describes for the corner case of sending new lines, and there is an extra unnecessary empty message, I do not see a legitimate reason for making a change that sacrifices the performance. I agree that the change might be beneficial, but only if we had a similar performance.

Second, once this PR is merged, there will be another PR ontop which improves performance

Sorry, we cannot do a merge that significantly changes the performance with a hope that some future work may fix it, knowing that it may never come. I am sure you understand this.

@mkarg
Copy link
Member Author

mkarg commented Feb 28, 2025

I do not agree that bug fixes must only get merged if they do not sacrifice performance, as this is rather often that case, actually.

Nevertheless, I will start benchmarking with my already developed performance improvement, so we have comparable numbers.

@mkarg
Copy link
Member Author

mkarg commented Mar 8, 2025

TL;DR: Here is the speed-optimized SSE code. 😃

Sorry for the delay. I was bound in other projects. I have now put some commits ontop providing superior performance, so in no case the corrected code is slower than the incorrect code. Benchmarks have proven that the new code (including the performance tweaks) is faster than the incorrect code in all cases. This mostly stems from the fact that I have implemented write(char[], int, int), which always outperforms a loop over write(int) when using real I/O instead of a null output stream.

Having said that, here are the benchmark results:

Benchmark              (SIZE)   Mode  Cnt    Score    Error  Units
SsePerf.optimizedCode       1  thrpt   25    0,081 ±  0,005  ops/s
SsePerf.optimizedCode      10  thrpt   25    0,159 ±  0,004  ops/s
SsePerf.optimizedCode     100  thrpt   25    1,569 ±  0,055  ops/s
SsePerf.optimizedCode    1000  thrpt   25   15,698 ±  0,305  ops/s
SsePerf.optimizedCode   10000  thrpt   25  137,396 ±  3,876  ops/s
SsePerf.optimizedCode  100000  thrpt   25  829,894 ± 19,216  ops/s
SsePerf.originalCode        1  thrpt   25    0,078 ±  0,003  ops/s
SsePerf.originalCode       10  thrpt   25    0,071 ±  0,001  ops/s
SsePerf.originalCode      100  thrpt   25    0,077 ±  0,002  ops/s
SsePerf.originalCode     1000  thrpt   25    0,079 ±  0,003  ops/s
SsePerf.originalCode    10000  thrpt   25    0,078 ±  0,003  ops/s
SsePerf.originalCode   100000  thrpt   25    0,078 ±  0,002  ops/s

As the numbers show, the new solution is just slightly faster for super-short messages, but gets increasingly faster the longer the message gets, while the original code was always slow independent of message length. Rather every non-trivial message bears hundreds to thousands of times better performance. While it looks impressive that there is a nearly 11.000x boost @ 100k message size, certainly most SSE messages in reality are rather tiny.

I modified your benchmark in several ways:

  • Using JMH to get reliable and stable results.
  • Using real socket I/O instead of a null output stream, as real I/O has dramatically slower write performance, particularly for single-int and small arrays, so the null output stream numbers were misleading.
  • Using \r\n in the middle of each message to consider the additional write overhead of that case.
  • SIZE now more directly reflects the message length in characters for better understanding of the numbers.

FYI, here is the updated source code of the benchmark:

public class SsePerf {

    public static void main(String[] args) throws Throwable {
        Main.main(args);
    }

    @State(Scope.Thread)
    public static class MyState {
        @Param({"1", "10", "100", "1000", "10000", "100000"})
        public int SIZE;

        private byte[] data;
        private ServerSocketChannel serverChannel;
       	private SocketChannel clientChannel;
       	private OutputStream os;
       	private volatile boolean running = true;

        @Setup(Level.Trial)
        public void beforeTrial() throws IOException {
            this.data = SIZE < 2 ? "X".repeat(SIZE).getBytes(UTF_8) : ("X".repeat(SIZE / 2 - 1) + "\r\n" + "X".repeat(SIZE / 2 - 1)).getBytes(UTF_8);
            this.serverChannel = ServerSocketChannel.open().bind(new InetSocketAddress(0));
            final var serverAddress = serverChannel.getLocalAddress();
            Thread.ofVirtual().start(() -> {
                try (final var acceptedChannel = serverChannel.accept(); final var is = Channels.newInputStream(acceptedChannel.socket().getChannel())) {
                while (this.running)
                    is.skip(Integer.MAX_VALUE);
                } catch (final IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
            clientChannel = SocketChannel.open(serverAddress);
            os = Channels.newOutputStream(clientChannel);
        }

        @TearDown(Level.Trial)
        public void afterTrial() throws IOException {
            this.running = false;
            this.os.close();
            this.clientChannel.close();
            this.serverChannel.close();
        }

	@Benchmark
	public void optimizedCode(MyState state) throws IOException {
            try (final var dls = new DataLeadStream(state.os)) {
                for (int i = 0, n = 1000000 / state.SIZE; i < n; i++)
                    dls.write(state.data);
                dls.finish();
            }
        }

	private static final byte[] DATA_LEAD = "data: ".getBytes(UTF_8);
	private static final byte[] EOL = {'\n'};

	static final class DataLeadStream extends OutputStream {
        private final OutputStream entityStream;

        private int lastChar = -1;

        DataLeadStream(final OutputStream entityStream) {
            this.entityStream = entityStream;
        }

        @Override
        public void write(final int i) throws IOException {
            if (lastChar == -1) {
                entityStream.write(DATA_LEAD);
            } else if (lastChar != '\n' && lastChar != '\r') {
                entityStream.write(lastChar);
            } else if (lastChar == '\n' || lastChar == '\r' && i != '\n') {
                entityStream.write(EOL);
                entityStream.write(DATA_LEAD);
            }

            lastChar = i;
        }

        private static int indexOfEol(final byte[] b, final int fromIndex, final int toIndex) {
            for (var i = fromIndex; i < toIndex; i++) {
                if (b[i] == '\n' || b[i] == '\r') {
                    return i;
                }
            }
            return -1;
        }

        @Override
        public void write(final byte[] b, final int off, final int len) throws IOException {
            Objects.checkFromIndexSize(off, len, b.length);
            if (len == 0) {
                return;
            }
            write(b[off]);
            if (len > 1) {
                final var end = off + len - 1;
                var i = off;
                for (var j = indexOfEol(b, i, end); j != -1; j = indexOfEol(b, i, end)) {
                    entityStream.write(b, i, j - i);
                    entityStream.write(EOL);
                    entityStream.write(DATA_LEAD);
                    if (b[j] == '\r' && b[j + 1] == '\n') {
                        j++;
                    }
                    i = ++j;
                }
                if (i < end) {
                    entityStream.write(b, i, end - i);
                }
                lastChar = b[end];
            }
        }

        void finish() throws IOException {
            if (lastChar != -1) {
                write(-1);
            }
        }
    }

	@Benchmark
	public void originalCode(MyState state) throws IOException {
        try (final var dls = new DataLeadStream0(state.os)) {
            for (int i = 0, n = 1000000 / state.SIZE; i < n; i++)
                dls.write(state.data);
		}
    }

	private static final class DataLeadStream0 extends OutputStream {
        private final OutputStream entityStream;
        private boolean start = true;

        private DataLeadStream0(OutputStream entityStream) {
            this.entityStream = entityStream;
        }

        @Override
        public void write(final int i) throws IOException {
            if (start) {
                entityStream.write(DATA_LEAD);
                start = false;
            }
            entityStream.write(i);
            if (i == '\n') {
                entityStream.write(DATA_LEAD);
            }
        }
    }
}

@mkarg
Copy link
Member Author

mkarg commented Mar 9, 2025

@jansupol Build on Java 21 works fine, but fails on Java 11 with OSGI errors outside of the scope of this MR. So I assume it is not my fault. Can you please check this? Thanks.

@mkarg
Copy link
Member Author

mkarg commented Mar 14, 2025

Kindly asking for review.

@jansupol
Copy link
Contributor

@mkarg Your test results look marvelous, I will check, thanks!

Copy link
Contributor

@jansupol jansupol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

@jansupol jansupol merged commit c370b1a into eclipse-ee4j:3.1 Mar 26, 2025
7 checks passed
@senivam senivam added this to the 3.1.11 milestone Mar 26, 2025
@mkarg mkarg deleted the sse-fix branch March 27, 2025 12:07
@solrbot solrbot mentioned this pull request Aug 8, 2025
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants