Merge pull request #21 from sharelatex/fix-urlfetcher-streams
clean up error handling in UrlFetcher
This commit is contained in:
@@ -2,33 +2,55 @@ request = require("request").defaults(jar: false)
|
|||||||
fs = require("fs")
|
fs = require("fs")
|
||||||
logger = require "logger-sharelatex"
|
logger = require "logger-sharelatex"
|
||||||
|
|
||||||
|
oneMinute = 60 * 1000
|
||||||
|
|
||||||
module.exports = UrlFetcher =
|
module.exports = UrlFetcher =
|
||||||
pipeUrlToFile: (url, filePath, _callback = (error) ->) ->
|
pipeUrlToFile: (url, filePath, _callback = (error) ->) ->
|
||||||
callbackOnce = (error) ->
|
callbackOnce = (error) ->
|
||||||
cleanUp error, (error) ->
|
clearTimeout timeoutHandler if timeoutHandler?
|
||||||
_callback(error)
|
_callback(error)
|
||||||
_callback = () ->
|
_callback = () ->
|
||||||
|
|
||||||
cleanUp = (error, callback) ->
|
timeoutHandler = setTimeout () ->
|
||||||
if error?
|
timeoutHandler = null
|
||||||
logger.log filePath: filePath, "deleting file from cache due to error"
|
logger.error url:url, filePath: filePath, "Timed out downloading file to cache"
|
||||||
fs.unlink filePath, (err) ->
|
callbackOnce(new Error("Timed out downloading file to cache #{url}"))
|
||||||
if err?
|
# FIXME: maybe need to close fileStream here
|
||||||
logger.err err: err, filePath: filePath, "error deleting file from cache"
|
, 3 * oneMinute
|
||||||
callback(error)
|
|
||||||
else
|
|
||||||
callback()
|
|
||||||
|
|
||||||
fileStream = fs.createWriteStream(filePath)
|
logger.log url:url, filePath: filePath, "started downloading url to cache"
|
||||||
fileStream.on 'error', (error) ->
|
urlStream = request.get({url: url, timeout: oneMinute})
|
||||||
logger.error err: error, url:url, filePath: filePath, "error writing file into cache"
|
urlStream.pause() # stop data flowing until we are ready
|
||||||
callbackOnce(error)
|
|
||||||
|
# attach handlers before setting up pipes
|
||||||
|
urlStream.on "error", (error) ->
|
||||||
|
logger.error err: error, url:url, filePath: filePath, "error downloading url"
|
||||||
|
callbackOnce(error or new Error("Something went wrong downloading the URL #{url}"))
|
||||||
|
|
||||||
|
urlStream.on "end", () ->
|
||||||
|
logger.log url:url, filePath: filePath, "finished downloading file into cache"
|
||||||
|
|
||||||
logger.log url:url, filePath: filePath, "downloading url to cache"
|
|
||||||
urlStream = request.get(url)
|
|
||||||
urlStream.on "response", (res) ->
|
urlStream.on "response", (res) ->
|
||||||
if res.statusCode >= 200 and res.statusCode < 300
|
if res.statusCode >= 200 and res.statusCode < 300
|
||||||
|
fileStream = fs.createWriteStream(filePath)
|
||||||
|
|
||||||
|
# attach handlers before setting up pipes
|
||||||
|
fileStream.on 'error', (error) ->
|
||||||
|
logger.error err: error, url:url, filePath: filePath, "error writing file into cache"
|
||||||
|
fs.unlink filePath, (err) ->
|
||||||
|
if err?
|
||||||
|
logger.err err: err, filePath: filePath, "error deleting file from cache"
|
||||||
|
callbackOnce(error)
|
||||||
|
|
||||||
|
fileStream.on 'finish', () ->
|
||||||
|
logger.log url:url, filePath: filePath, "finished writing file into cache"
|
||||||
|
callbackOnce()
|
||||||
|
|
||||||
|
fileStream.on 'pipe', () ->
|
||||||
|
logger.log url:url, filePath: filePath, "piping into filestream"
|
||||||
|
|
||||||
urlStream.pipe(fileStream)
|
urlStream.pipe(fileStream)
|
||||||
|
urlStream.resume() # now we are ready to handle the data
|
||||||
else
|
else
|
||||||
logger.error statusCode: res.statusCode, url:url, filePath: filePath, "unexpected status code downloading url to cache"
|
logger.error statusCode: res.statusCode, url:url, filePath: filePath, "unexpected status code downloading url to cache"
|
||||||
# https://nodejs.org/api/http.html#http_class_http_clientrequest
|
# https://nodejs.org/api/http.html#http_class_http_clientrequest
|
||||||
@@ -39,15 +61,5 @@ module.exports = UrlFetcher =
|
|||||||
# method. Until the data is consumed, the 'end' event will not
|
# method. Until the data is consumed, the 'end' event will not
|
||||||
# fire. Also, until the data is read it will consume memory
|
# fire. Also, until the data is read it will consume memory
|
||||||
# that can eventually lead to a 'process out of memory' error.
|
# that can eventually lead to a 'process out of memory' error.
|
||||||
urlStream.on 'data', () -> # discard the data
|
urlStream.resume() # discard the data
|
||||||
callbackOnce(new Error("URL returned non-success status code: #{res.statusCode} #{url}"))
|
callbackOnce(new Error("URL returned non-success status code: #{res.statusCode} #{url}"))
|
||||||
|
|
||||||
urlStream.on "error", (error) ->
|
|
||||||
logger.error err: error, url:url, filePath: filePath, "error downloading url"
|
|
||||||
callbackOnce(error or new Error("Something went wrong downloading the URL #{url}"))
|
|
||||||
|
|
||||||
urlStream.on "end", () ->
|
|
||||||
# FIXME: what if we get an error writing the file? Maybe we
|
|
||||||
# should be using the fileStream end event as the point of
|
|
||||||
# callback.
|
|
||||||
callbackOnce()
|
|
||||||
|
|||||||
@@ -22,25 +22,29 @@ describe "UrlFetcher", ->
|
|||||||
@path = "/path/to/file/on/disk"
|
@path = "/path/to/file/on/disk"
|
||||||
@request.get = sinon.stub().returns(@urlStream = new EventEmitter)
|
@request.get = sinon.stub().returns(@urlStream = new EventEmitter)
|
||||||
@urlStream.pipe = sinon.stub()
|
@urlStream.pipe = sinon.stub()
|
||||||
@fs.createWriteStream = sinon.stub().returns(@fileStream = { on: () -> })
|
@urlStream.pause = sinon.stub()
|
||||||
|
@urlStream.resume = sinon.stub()
|
||||||
|
@fs.createWriteStream = sinon.stub().returns(@fileStream = new EventEmitter)
|
||||||
@fs.unlink = (file, callback) -> callback()
|
@fs.unlink = (file, callback) -> callback()
|
||||||
@UrlFetcher.pipeUrlToFile(@url, @path, @callback)
|
@UrlFetcher.pipeUrlToFile(@url, @path, @callback)
|
||||||
|
|
||||||
it "should request the URL", ->
|
it "should request the URL", ->
|
||||||
@request.get
|
@request.get
|
||||||
.calledWith(@url)
|
.calledWith(sinon.match {"url": @url})
|
||||||
.should.equal true
|
.should.equal true
|
||||||
|
|
||||||
it "should open the file for writing", ->
|
|
||||||
@fs.createWriteStream
|
|
||||||
.calledWith(@path)
|
|
||||||
.should.equal true
|
|
||||||
|
|
||||||
describe "successfully", ->
|
describe "successfully", ->
|
||||||
beforeEach ->
|
beforeEach ->
|
||||||
@res = statusCode: 200
|
@res = statusCode: 200
|
||||||
@urlStream.emit "response", @res
|
@urlStream.emit "response", @res
|
||||||
@urlStream.emit "end"
|
@urlStream.emit "end"
|
||||||
|
@fileStream.emit "finish"
|
||||||
|
|
||||||
|
it "should open the file for writing", ->
|
||||||
|
@fs.createWriteStream
|
||||||
|
.calledWith(@path)
|
||||||
|
.should.equal true
|
||||||
|
|
||||||
it "should pipe the URL to the file", ->
|
it "should pipe the URL to the file", ->
|
||||||
@urlStream.pipe
|
@urlStream.pipe
|
||||||
|
|||||||
Reference in New Issue
Block a user