-
-
Notifications
You must be signed in to change notification settings - Fork 80
Implement non-buffered streaming for step logs #438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
8393a07
Implement non-buffered streaming for step logs
das7pad f4d6f1e
Use Jenkins core incremental to pull in LargeText interface
das7pad 515f67e
More robust check for stopping at previously determined EOF
das7pad 9dea640
Increase minimum Jenkins version for LargeText interface support
das7pad 01d7de0
Use most recent bom baseline
timja File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,9 +31,11 @@ | |
| import hudson.model.BuildListener; | ||
| import hudson.model.TaskListener; | ||
| import java.io.BufferedReader; | ||
| import java.io.EOFException; | ||
| import java.io.File; | ||
| import java.io.FileOutputStream; | ||
| import java.io.FilterOutputStream; | ||
| import java.io.InputStream; | ||
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
| import java.io.OutputStreamWriter; | ||
|
|
@@ -54,6 +56,7 @@ | |
| import org.kohsuke.accmod.Restricted; | ||
| import org.kohsuke.accmod.restrictions.Beta; | ||
| import org.kohsuke.stapler.framework.io.ByteBuffer; | ||
| import org.kohsuke.stapler.framework.io.LargeText; | ||
|
|
||
| /** | ||
| * Simple implementation of log storage in a single file that maintains a side file with an index indicating where node transitions occur. | ||
|
|
@@ -268,23 +271,77 @@ | |
| @NonNull | ||
| @Override public AnnotatedLargeText<FlowNode> stepLog(@NonNull FlowNode node, boolean complete) { | ||
| maybeFlush(); | ||
| String id = node.getId(); | ||
| try (ByteBuffer buf = new ByteBuffer(); | ||
| RandomAccessFile raf = new RandomAccessFile(log, "r"); | ||
| BufferedReader indexBR = index.isFile() ? Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8) : new BufferedReader(new NullReader(0))) { | ||
| // Check this _before_ reading index-log to reduce the chance of a race condition resulting in recent content being associated with the wrong step: | ||
| long end = raf.length(); | ||
| // To produce just the output for a single step (again we do not need to pay attention to ConsoleNote here since AnnotatedLargeText handles it), | ||
| // index-log is read looking for transitions that pertain to this step: beginning or ending its content, including at EOF if applicable. | ||
| // (Other transitions, such as to or from unrelated steps, are irrelevant). | ||
| // Once a start and end position have been identified, that block is copied to a memory buffer. | ||
| String line; | ||
| long pos = -1; // -1 if not currently in this node, start position if we are | ||
| while ((line = indexBR.readLine()) != null) { | ||
| long rawLogSize; | ||
| long stepLogSize = 0; | ||
| String nodeId = node.getId(); | ||
| try (RandomAccessFile raf = new RandomAccessFile(log, "r")) { | ||
| // Check this _before_ reading index-log to reduce the chance of a race condition resulting in recent content being associated with the wrong step. | ||
| rawLogSize = raf.length(); | ||
| if (index.isFile()) { | ||
| try (IndexReader idr = new IndexReader(rawLogSize, nodeId)) { | ||
| stepLogSize = idr.getStepLogSize(); | ||
| } | ||
| } | ||
| } catch (IOException x) { | ||
| return new BrokenLogStorage(x).stepLog(node, complete); | ||
| } | ||
| if (stepLogSize == 0) { | ||
| return new AnnotatedLargeText<>(new ByteBuffer(), StandardCharsets.UTF_8, complete, node); | ||
| } | ||
| return new AnnotatedLargeText<>(new StreamingStepLog(rawLogSize, stepLogSize, nodeId), StandardCharsets.UTF_8, complete, node); | ||
| } | ||
|
|
||
| private class IndexReader implements AutoCloseable { | ||
| static class Next { | ||
| public long start = -1; | ||
| public long end = -1; | ||
| } | ||
| private final String nodeId; | ||
| private final long rawLogSize; | ||
| private boolean done; | ||
| private BufferedReader indexBR = null; | ||
| private long pos = -1; // -1 if not currently in this node, start position if we are | ||
|
|
||
| public IndexReader(long rawLogSize, String nodeId) { | ||
| this.rawLogSize = rawLogSize; | ||
| this.nodeId = nodeId; | ||
| } | ||
|
|
||
| public void close() throws IOException { | ||
| if (indexBR != null) { | ||
| indexBR.close(); | ||
| indexBR = null; | ||
| } | ||
| } | ||
|
|
||
| private void ensureOpen() throws IOException { | ||
| if (indexBR == null) { | ||
| indexBR = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8); | ||
| } | ||
| } | ||
|
|
||
| public long getStepLogSize() throws IOException { | ||
| long stepLogSize = 0; | ||
| Next next = new Next(); | ||
| while (readNext(next)) { | ||
| stepLogSize += (next.end - next.start); | ||
| } | ||
| return stepLogSize; | ||
| } | ||
|
|
||
| public boolean readNext(Next next) throws IOException { | ||
| if (done) return false; | ||
| ensureOpen(); | ||
| while (!done) { | ||
| String line = indexBR.readLine(); | ||
| if (line == null) { | ||
| done = true; | ||
| break; | ||
| } | ||
| int space = line.indexOf(' '); | ||
| long lastTransition = -1; | ||
| long nextTransition; | ||
| try { | ||
| lastTransition = Long.parseLong(space == -1 ? line : line.substring(0, space)); | ||
| nextTransition = Long.parseLong(space == -1 ? line : line.substring(0, space)); | ||
| } catch (NumberFormatException x) { | ||
| LOGGER.warning("Ignoring corrupt index file " + index); | ||
| // If index-log is corrupt for whatever reason, we given up on this step in this build; | ||
|
|
@@ -295,48 +352,152 @@ | |
| pos = -1; | ||
| continue; | ||
| } | ||
| if (nextTransition >= rawLogSize) { | ||
| // Do not emit positions past the previously determined logSize. | ||
| nextTransition = rawLogSize; | ||
| done = true; | ||
| } | ||
| if (pos == -1) { | ||
| if (space != -1 && line.substring(space + 1).equals(id)) { | ||
| pos = lastTransition; | ||
| } | ||
| } else if (lastTransition > pos) { | ||
| raf.seek(pos); | ||
| if (lastTransition > pos + Integer.MAX_VALUE) { | ||
| throw new IOException("Cannot read more than 2Gib at a time"); // ByteBuffer does not support it anyway | ||
| if (space != -1 && line.substring(space + 1).equals(nodeId)) { | ||
| pos = nextTransition; | ||
| } | ||
| // Could perhaps be done a bit more efficiently with FileChannel methods, | ||
| // at least if org.kohsuke.stapler.framework.io.ByteBuffer were replaced by java.nio.[Heap]ByteBuffer. | ||
| // The overall bottleneck here is however the need to use a memory buffer to begin with: | ||
| // LargeText.Source/Session are not public so, pending improvements to Stapler, | ||
| // we cannot lazily stream per-step content the way we do for the overall log. | ||
| // (Except perhaps by extending ByteBuffer and then overriding every public method!) | ||
| // LargeText also needs to be improved to support opaque (non-long) cursors | ||
| // (and callers such as progressiveText.jelly and Blue Ocean updated accordingly), | ||
| // which is a hard requirement for efficient rendering of cloud-backed logs, | ||
|
Comment on lines
-313
to
-315
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still true I think. (Blue Ocean is moribund, but the same comment potentially applies to anything rendering JEP-210-based logs.) OTOH there has been no recent development in this area; CloudBees does maintain a cloud-backed log storage, but largely using private API contracts. |
||
| // though for this implementation we do not need it since we can work with byte offsets. | ||
| byte[] data = new byte[(int) (lastTransition - pos)]; | ||
| raf.readFully(data); | ||
| buf.write(data); | ||
| } else if (nextTransition > pos) { | ||
| next.start = pos; | ||
| next.end = nextTransition; | ||
| pos = -1; | ||
| return true; | ||
| } else { | ||
| // Some sort of mismatch. Do not emit this section. | ||
| pos = -1; | ||
| } | ||
| } | ||
| if (pos != -1 && /* otherwise race condition? */ end > pos) { | ||
| if (pos != -1 && rawLogSize > pos) { | ||
| // In case the build is ongoing and we are still actively writing content for this step, | ||
| // we will hit EOF before any other transition. Otherwise identical to normal case above. | ||
| raf.seek(pos); | ||
| if (end > pos + Integer.MAX_VALUE) { | ||
| throw new IOException("Cannot read more than 2Gib at a time"); | ||
| next.start = pos; | ||
| next.end = rawLogSize; | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private class StreamingStepLog implements LargeText.Source { | ||
| private final String nodeId; | ||
| private final long rawLogSize; | ||
| private final long stepLogSize; | ||
|
|
||
| StreamingStepLog(long rawLogSize, long stepLogSize, String nodeId ) { | ||
| super(); | ||
| this.rawLogSize = rawLogSize; | ||
| this.stepLogSize = stepLogSize; | ||
| this.nodeId = nodeId; | ||
| } | ||
|
|
||
| public boolean exists() { | ||
| return true; | ||
| } | ||
|
|
||
| public long length() { | ||
| return stepLogSize; | ||
| } | ||
|
|
||
| public LargeText.Session open() { | ||
| return new StreamingStepLogSession(); | ||
| } | ||
|
|
||
| class StreamingStepLogSession extends InputStream implements LargeText.Session { | ||
| private RandomAccessFile rawLog; | ||
| private final IndexReader.Next next = new IndexReader.Next(); | ||
| private IndexReader indexReader; | ||
| private long rawLogPos = next.end; | ||
| private long stepLogPos = 0; | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| try { | ||
| if (rawLog != null) { | ||
| rawLog.close(); | ||
| rawLog = null; | ||
| } | ||
| } finally { | ||
| if (indexReader != null) { | ||
| indexReader.close(); | ||
| indexReader = null; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public long skip(long n) throws IOException { | ||
| if (stepLogPos + n > stepLogSize) { | ||
| return 0; | ||
| } | ||
| if (n == 0) return 0; | ||
|
|
||
| ensureOpen(); | ||
| long skipped = 0; | ||
| while (skipped < n) { | ||
| advanceNextIfNeeded(false); | ||
| long remainingInNext = next.end - rawLogPos; | ||
| long remainingToSkip = n - skipped; | ||
| long skip = Long.min(remainingInNext, remainingToSkip); | ||
| rawLogPos += skip; | ||
| stepLogPos += skip; | ||
| skipped += skip; | ||
| } | ||
| rawLog.seek(rawLogPos); | ||
| return skipped; | ||
| } | ||
|
|
||
| @Override | ||
| public int read() throws IOException { | ||
| byte[] b = new byte[1]; | ||
| int n = read(b, 0, 1); | ||
| if (n != 1) return -1; | ||
| return (int) b[0]; | ||
| } | ||
|
|
||
| @Override | ||
| public int read(@NonNull byte[] b) throws IOException { | ||
| return read(b, 0, b.length); | ||
| } | ||
|
|
||
| @Override | ||
| public int read(@NonNull byte[] b, int off, int len) throws IOException { | ||
| if (stepLogPos >= stepLogSize) { | ||
| return -1; | ||
| } | ||
| ensureOpen(); | ||
| advanceNextIfNeeded(true); | ||
| long remaining = next.end - rawLogPos; | ||
| if (len > remaining) { | ||
| // len is an int and remaining is smaller, so no overflow is possible. | ||
| len = (int) remaining; | ||
| } | ||
| int n = rawLog.read(b, off, len); | ||
| rawLogPos += n; | ||
| stepLogPos += n; | ||
| return n; | ||
| } | ||
|
|
||
| private void advanceNextIfNeeded(boolean seek) throws IOException { | ||
| if (rawLogPos < next.end) return; | ||
| if (!indexReader.readNext(next)) { | ||
| throw new EOFException("index truncated; did not reach previously discovered end of step log"); | ||
| } | ||
| if (seek) rawLog.seek(next.start); | ||
| rawLogPos = next.start; | ||
| } | ||
|
|
||
| private void ensureOpen() throws IOException { | ||
| if (rawLog == null) { | ||
| rawLog = new RandomAccessFile(log, "r"); | ||
| } | ||
| if (indexReader == null) { | ||
| indexReader = new IndexReader(rawLogSize, nodeId); | ||
| } | ||
| byte[] data = new byte[(int) (end - pos)]; | ||
| raf.readFully(data); | ||
| buf.write(data); | ||
| } | ||
| return new AnnotatedLargeText<>(buf, StandardCharsets.UTF_8, complete, node); | ||
| } catch (IOException x) { | ||
| return new BrokenLogStorage(x).stepLog(node, complete); | ||
| } | ||
| } | ||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not seem particularly efficient, but I guess it can be optimized later as needed.