client.coffee | |
---|---|
assert = require 'assert'
ns = require './ns'
url = require 'url'
{Socket} = require 'net'
{Stream} = require 'stream'
{debug} = require './util' | |
A Client establishes a connection to a worker process. It takes a Its API is similar to | exports.Client = class Client extends Socket
constructor: ->
super
debug "client created" |
Initialize outgoing array to hold pending requests | @_outgoing = [] |
Incoming is used to point to the current response | @_incoming = null
self = this |
Once we've made the connect, process the next request | @on 'connect', -> self._processRequest() |
Finalize the request on close | @on 'close', -> self._finishRequest() |
Initialize the response netstring parser | @_initResponseParser()
_initResponseParser: ->
self = this |
Initialize a Netstring stream parser | nsStream = new ns.Stream this |
Listen for data and hand it to our parser | nsStream.on 'data', (data) ->
if self._incoming
self._incoming._receiveData data |
Bubble any errors | nsStream.on 'error', (exception) ->
self._incoming = null
self.emit 'error', exception
_processRequest: -> |
Process the request now if the socket is open and we aren't already handling a response | if @readyState is 'open' and !@_incoming
if request = @_outgoing[0]
debug "processing outgoing request 1/#{@_outgoing.length}"
@_incoming = new ClientResponse this, request |
Flush the request buffer into socket | request.assignSocket @
else |
Try to reconnect and try again soon | @reconnect()
_finishRequest: ->
debug "finishing request"
request = @_outgoing.shift()
request.detachSocket @
res = @_incoming
@_incoming = null
if res is null or res.received is false
@emit 'error', new Error "Response was not received"
else if res.completed is false and res.readable is true
@emit 'error', new Error "Response was not completed" |
Anymore requests, continue processing | if @_outgoing.length > 0
@_processRequest() |
Reconnect if the connection is closed. | reconnect: ->
if @readyState is 'closed'
debug "connecting to #{@port}"
@connect @port, @host |
Start the connection and create a ClientRequest. | request: (args...) ->
request = new ClientRequest args...
@_outgoing.push request
@_processRequest()
request |
Proxy a | proxyRequest: (serverRequest, serverResponse, metaVariables = {}) ->
metaVariables["REMOTE_ADDR"] ?= serverRequest.connection.remoteAddress
metaVariables["REMOTE_PORT"] ?= serverRequest.connection.remotePort
clientRequest = @request serverRequest.method, serverRequest.url, serverRequest.headers, metaVariables
serverRequest.on 'data', (data) -> clientRequest.write data
serverRequest.on 'end', -> clientRequest.end()
serverRequest.on 'error', -> clientRequest.end()
clientRequest.on 'error', -> serverRequest.destroy()
clientRequest.on 'response', (clientResponse) ->
serverResponse.writeHead clientResponse.statusCode, clientResponse.headers
clientResponse.pipe serverResponse
clientRequest |
Public API for creating a Client | exports.createConnection = (port, host) ->
client = new Client
client.port = port
client.host = host
client |
Empty netstring signals EOF | END_OF_FILE = ns.nsWrite "" |
A ClientRequest is returned when It is a Writable Stream and responds to the conventional
Its also an EventEmitter with the following events:
| exports.ClientRequest = class ClientRequest extends Stream
constructor: (@method, @path, headers, metaVariables) ->
debug "requesting #{@method} #{@path}"
@writeable = true |
Initialize writeQueue since socket is still connecting net.Stream will buffer on connecting in node 0.3.x | @_writeQueue = [] |
Build an | @_parseEnv headers, metaVariables |
Then write it to the socket | @write JSON.stringify @env
_parseEnv: (headers, metaVariables) ->
@env = {} |
Set | @env['REQUEST_METHOD'] = @method |
Parse the request path an assign its parts to the env | {pathname, query} = url.parse @path
@env['PATH_INFO'] = pathname
@env['QUERY_STRING'] = query
@env['SCRIPT_NAME'] = "" |
Initialize | @env['REMOTE_ADDR'] = "0.0.0.0"
@env['SERVER_ADDR'] = "0.0.0.0" |
Parse the | if host = headers.host
if {name, port} = headers.host.split ':'
@env['SERVER_NAME'] = name
@env['SERVER_PORT'] = port
for key, value of headers |
Upcase all header key values | key = key.toUpperCase().replace /-/g, '_' |
Prepend | key = "HTTP_#{key}" unless key == 'CONTENT_TYPE' or key == 'CONTENT_LENGTH' |
And merge them into the | @env[key] = value |
Merge all | for key, value of metaVariables
@env[key] = value
assignSocket: (socket) ->
debug "socket assigned, flushing request"
@socket = @connection = socket
@_flush()
detachSocket: (socket) ->
@writeable = false
@socket = @connection = null |
Write chunk to client | write: (chunk, encoding) -> |
Netstring encode chunk | nsChunk = ns.nsWrite chunk, 0, chunk.length, null, 0, encoding
if @_writeQueue
debug "queueing #{nsChunk.length} bytes"
@_writeQueue.push nsChunk |
Return false because data was buffered | false
else if @connection
debug "writing #{nsChunk.length} bytes"
@connection.write nsChunk |
Closes writting socket. | end: (chunk, encoding) ->
if (chunk)
@write chunk, encoding
flushed = if @_writeQueue
debug "queueing close"
@_writeQueue.push END_OF_FILE |
Return false because data was buffered | false
else if @connection
debug "closing connection"
@connection.end END_OF_FILE
@detachSocket @socket
flushed
destroy: ->
@detachSocket @socket
@socket.destroy()
_flush: ->
while @_writeQueue and @_writeQueue.length
data = @_writeQueue.shift() |
Close write socket when we see an empty netstring | if data is END_OF_FILE
@socket.end data
else
debug "flushing #{data.length} bytes"
@socket.write data |
Clear queue, remaining writes won't buffer | @_writeQueue = null |
Buffer is empty, let the world know! | @emit 'drain'
true |
A ClientResponse is emitted from the client request's
It is a Readable Stream and emits the conventional events:
| exports.ClientResponse = class ClientResponse extends Stream
constructor: (@socket, @request) ->
@client = @socket
@readable = true
@received = false
@completed = false
@statusCode = null
@httpVersion = '1.1'
@headers = null
@_buffer = null
_receiveData: (data) ->
debug "received #{data.length} bytes"
return if !@readable or @completed
@received = true
try
if data.length > 0 |
The first response part is the status | if !@statusCode
@statusCode = parseInt data
assert.ok @statusCode >= 100, "Status must be >= 100" |
The second part is the JSON encoded headers | else if !@headers
@headers = {} |
Parse the headers | rawHeaders = JSON.parse data
assert.ok rawHeaders, "Headers can not be null"
assert.equal typeof rawHeaders, 'object', "Headers must be an object"
for k, vs of rawHeaders |
Split multiline Rack headers | v = vs.split "\n"
@headers[k] = if v.length > 0 |
Hack for node 0.2 headers http://github.com/ry/node/commit/6560ab9 | v.join "\r\n#{k}: "
else
vs
debug "response received: #{@statusCode}" |
Emit response once we've received the status and headers | @request.emit 'response', this |
Else its body parts | else if data.length > 0
@emit 'data', data |
Empty string means EOF | else
debug "response complete"
assert.ok @statusCode, "Missing status code"
assert.ok @headers, "Missing headers"
@readable = false
@completed = true
@emit 'end'
catch error |
See if payload is an exception backtrace | exception = try JSON.parse data
if exception and exception.name and exception.message
error = new Error exception.message
error.name = exception.name
error.stack = exception.stack
debug "response error", error |
Mark as not readable to stop parsing | @readable = false |
Catch and emit as a socket error | @socket.emit 'error', error
|