diff --git a/internal/shim/byline.js b/internal/shim/byline.js deleted file mode 100644 index ca1494fc..00000000 --- a/internal/shim/byline.js +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright (C) 2011-2015 John Hewson -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to -// deal in the Software without restriction, including without limitation the -// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or -// sell copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -// IN THE SOFTWARE. - -var stream = require('stream'), - util = require('util'); - -// convinience API -module.exports = function(readStream, options) { - return module.exports.createStream(readStream, options); -}; - -// basic API -module.exports.createStream = function(readStream, options) { - if (readStream) { - return createLineStream(readStream, options); - } else { - return new LineStream(options); - } -}; - -// deprecated API -module.exports.createLineStream = function(readStream) { - console.log('WARNING: byline#createLineStream is deprecated and will be removed soon'); - return createLineStream(readStream); -}; - -function createLineStream(readStream, options) { - if (!readStream) { - throw new Error('expected readStream'); - } - if (!readStream.readable) { - throw new Error('readStream must be readable'); - } - var ls = new LineStream(options); - readStream.pipe(ls); - return ls; -} - -// -// using the new node v0.10 "streams2" API -// - -module.exports.LineStream = LineStream; - -function LineStream(options) { - stream.Transform.call(this, options); - options = options || {}; - - // use objectMode to stop the output from being buffered - // which re-concatanates the lines, just without newlines. - this._readableState.objectMode = true; - this._lineBuffer = []; - this._keepEmptyLines = options.keepEmptyLines || false; - this._lastChunkEndedWithCR = false; - - // take the source's encoding if we don't have one - this.on('pipe', function(src) { - if (!this.encoding) { - // but we can't do this for old-style streams - if (src instanceof stream.Readable) { - this.encoding = src._readableState.encoding; - } - } - }); -} -util.inherits(LineStream, stream.Transform); - -LineStream.prototype._transform = function(chunk, encoding, done) { - // decode binary chunks as UTF-8 - encoding = encoding || 'utf8'; - - if (Buffer.isBuffer(chunk)) { - if (encoding == 'buffer') { - chunk = chunk.toString(); // utf8 - encoding = 'utf8'; - } - else { - chunk = chunk.toString(encoding); - } - } - this._chunkEncoding = encoding; - - var lines = chunk.split(/\r\n|\r|\n/g); - - // don't split CRLF which spans chunks - if (this._lastChunkEndedWithCR && chunk[0] == '\n') { - lines.shift(); - } - - if (this._lineBuffer.length > 0) { - this._lineBuffer[this._lineBuffer.length - 1] += lines[0]; - lines.shift(); - } - - this._lastChunkEndedWithCR = chunk[chunk.length - 1] == '\r'; - this._lineBuffer = this._lineBuffer.concat(lines); - this._pushBuffer(encoding, 1, done); -}; - -LineStream.prototype._pushBuffer = function(encoding, keep, done) { - // always buffer the last (possibly partial) line - while (this._lineBuffer.length > keep) { - var line = this._lineBuffer.shift(); - // skip empty lines - if (this._keepEmptyLines || line.length > 0 ) { - if (!this.push(this._reencode(line, encoding))) { - // when the high-water mark is reached, defer pushes until the next tick - var self = this; - setImmediate(function() { - self._pushBuffer(encoding, keep, done); - }); - return; - } - } - } - done(); -}; - -LineStream.prototype._flush = function(done) { - this._pushBuffer(this._chunkEncoding, 0, done); -}; - -// see Readable::push -LineStream.prototype._reencode = function(line, chunkEncoding) { - if (this.encoding && this.encoding != chunkEncoding) { - return new Buffer(line, chunkEncoding).toString(this.encoding); - } - else if (this.encoding) { - // this should be the most common case, i.e. we're using an encoded source stream - return line; - } - else { - return new Buffer(line, chunkEncoding); - } -}; diff --git a/internal/shim/index.js b/internal/shim/index.js index 58d609ef..6b675f6b 100644 --- a/internal/shim/index.js +++ b/internal/shim/index.js @@ -1,6 +1,4 @@ - -var child = require('child_process'); -var byline = require('./byline'); +const child = require('child_process'); /** * Debug env var. @@ -13,13 +11,13 @@ const debug = process.env.DEBUG_SHIM; * many concurrent requests are outstanding. */ -var callbacks = {}; +const callbacks = new Map(); /** * The last id attached to a request / callback pair */ -var lastId = (Date.now() / 1000) | 0; +let lastId = (Date.now() / 1000) | 0; /** * nextId generates ids which will only be repeated every 2^52 times being generated @@ -28,19 +26,56 @@ var lastId = (Date.now() / 1000) | 0; function nextId(){ // Prevent bugs where integer precision wraps around on floating point numbers // (usually around 52-53 bits) - var id = (lastId + 1) | 0; + let id = (lastId + 1) | 0; if (id === lastId) { id = 1; } + lastId = id; return String(id); } +/** + * handleLine is responsible for taking a line of output from the child process + * and calling the appropiate callbacks. + */ +function handleLine(line) { + if (debug) { + console.log('[shim] parsing: `%s`', line); + } + + let msg; + try { + msg = JSON.parse(line); + } catch (err) { + console.log('[shim] unexpected non-json line: `%s`', line); + return; + } + + if (typeof msg.id !== 'string') { + console.log('[shim] unexpected line - do not use stdout: `%s`', line); + return; + } + + const c = callbacks.get(msg.id); + callbacks.delete(msg.id); + + if (!c) { + if (debug) { + console.log('[shim] unexpected duplicate response: `%s`', line); + } + + return; + } + + c(msg.error, msg.value); +} + /** * Child process for binary I/O. */ -var proc = child.spawn('./main', { stdio: ['pipe', 'pipe', process.stderr] }); +const proc = child.spawn('./main', { stdio: ['pipe', 'pipe', process.stderr] }); proc.on('error', function(err){ console.error('[shim] error: %s', err); @@ -56,36 +91,56 @@ proc.on('exit', function(code, signal){ * Newline-delimited JSON stdout. */ -var out = byline(proc.stdout) - -out.on('data', function(line){ - if (debug) console.log('[shim] parsing: `%s`', line) - - var msg; - try { - msg = JSON.parse(line); - } catch (err) { - console.log('[shim] unexpected non-json line: `%s`', line); - return +// Chunks holds onto partial chunks received in the absense of a newline. +// invariant: an array of Buffer objects, all of which do not have any newline characters +let chunks = []; +const NEWLINE = '\n'.charCodeAt(0); + +// Find successive newlines in this chunk, and pass them along to `handleChunk` +function handleChunk(chunk) { + // since this current chunk can have multple lines inside of it + // keep track of how much of the current chunk we've consumed + let chunkPos = 0; + for (;;) { + // Find the first newline in the current, in the part of the current chunk we have not + // looked yet. + const newlinePos = chunk.indexOf(NEWLINE, chunkPos); + + // We were not able to find any more newline characters in this chunk, + // save the remaineder in `chunks` for later processing + if (newlinePos === -1) { + chunks.push(chunk.slice(chunkPos)); + break; + } + + // We have found an end of a whole line, the beginning of the line will be the combination + // of all Buffers currently buffered in the `chunks` array (if any) + const start = chunk.slice(chunkPos, newlinePos); + + chunks.push(start); + const line = Buffer.concat(chunks); + chunks = []; + + // increase the chunk position, to skip over the last line we just found + chunkPos = newlinePos + 1; + handleLine(line) } +} - if (typeof msg.id !== 'string') { - console.log('[shim] unexpected line - do not use stdout: `%s`', line); - return - } +const out = proc.stdout; - const c = callbacks[msg.id]; - delete callbacks[msg.id]; +out.on('readable', () => { + for (;;) { + const chunk = out.read(); + if (chunk === null) { + break; + } - if (!c) { - if (debug) console.log('[shim] unexpected duplicate response: `%s`', line) - return + // Pump all data chunks into chunk handler + handleChunk(chunk); } - - c(msg.error, msg.value); }); - /** * Handle events. */ @@ -93,7 +148,7 @@ exports.handle = function(event, ctx, cb) { ctx.callbackWaitsForEmptyEventLoop = false; const id = nextId(); - callbacks[id] = cb; + callbacks.set(id, cb); proc.stdin.write(JSON.stringify({ "id": id, diff --git a/internal/zip/zip.go b/internal/zip/zip.go index 9dd1ccdf..a3bb797d 100644 --- a/internal/zip/zip.go +++ b/internal/zip/zip.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/pkg/errors" - "github.com/tj/go-archive" + archive "github.com/tj/go-archive" ) var transform = archive.TransformFunc(func(r io.Reader, i os.FileInfo) (io.Reader, os.FileInfo) { @@ -37,7 +37,7 @@ func Build(dir string) (io.ReadCloser, *archive.Stats, error) { strings.NewReader(".*\n"), strings.NewReader("\n!vendor\n!node_modules/**\n!.pypath/**\n"), upignore, - strings.NewReader("\n!main\n!server\n!_proxy.js\n!byline.js\n!up.json\n!pom.xml\n!build.gradle\n!project.clj\ngin-bin\nup\n")) + strings.NewReader("\n!main\n!server\n!_proxy.js\n!up.json\n!pom.xml\n!build.gradle\n!project.clj\ngin-bin\nup\n")) filter, err := archive.FilterPatterns(r) if err != nil { diff --git a/platform/lambda/lambda.go b/platform/lambda/lambda.go index acab258e..2b42cca1 100644 --- a/platform/lambda/lambda.go +++ b/platform/lambda/lambda.go @@ -907,10 +907,6 @@ func (p *Platform) injectProxy() error { return errors.Wrap(err, "writing up-proxy") } - if err := ioutil.WriteFile("byline.js", shim.MustAsset("byline.js"), 0755); err != nil { - return errors.Wrap(err, "writing byline.js") - } - if err := ioutil.WriteFile("_proxy.js", shim.MustAsset("index.js"), 0755); err != nil { return errors.Wrap(err, "writing _proxy.js") } @@ -923,7 +919,6 @@ func (p *Platform) removeProxy() error { log.Debugf("removing proxy") os.Remove("main") os.Remove("_proxy.js") - os.Remove("byline.js") return nil }