New timeout system based on heartbeats

This commit is contained in:
Guillermo Rauch
2010-07-19 01:55:44 -07:00
parent 8bf30afda2
commit 1aa7845c62
4 changed files with 68 additions and 53 deletions

View File

@@ -5,8 +5,8 @@ exports.Client = Class({
include: [options],
options: {
closeTimeout: 0,
heartbeatInterval: 5000
timeout: 12000,
closeTimeout: 0
},
init: function(listener, req, res, options, head){
@@ -14,17 +14,17 @@ exports.Client = Class({
this.setOptions(options);
this.connections = 0;
this.connected = false;
this._heartbeats = 0;
this.upgradeHead = head;
this._onConnect(req, res);
},
send: function(message){
if (!this.connected || !(this.connection.readyState === 'open' ||
this.connection.readyState === 'writeOnly')) {
this.connection.readyState === 'writeOnly')){
return this._queue(message);
}
this._write(JSON.stringify({messages: [message]}));
this._write(this._encode(message));
return this;
},
@@ -37,14 +37,17 @@ exports.Client = Class({
},
_onMessage: function(data){
try {
var messages = JSON.parse(data);
} catch(e){
return this.listener.options.log('Bad message received from client ' + this.sessionId);
if (data.substr(0, 3) == '\ufffdm\ufffd'){
var messages = this._decode(data);
} else {
return this.listener.options.log('Bad message received from client ' + this.sessionId);
}
for (var i = 0, l = messages.length; i < l; i++){
if (messages[i].substr(0, 3) == '\ufffdh\ufffd'){
return this._onHeartbeat(data.substr(3));
}
this.listener._onClientMessage(messages[i], this);
}
}
},
_onConnect: function(req, res){
@@ -52,60 +55,88 @@ exports.Client = Class({
this.request = req;
this.response = res;
this.connection = this.request.connection;
if (this._disconnectTimeout) {
clearTimeout(this._disconnectTimeout);
}
if (this._disconnectTimeout) clearTimeout(this._disconnectTimeout);
},
_encode: function(messages){
var ret = '',
messages = messages instanceof Array ? messages : [];
for (var i = 0, l = messages.length; i < l; i++){
ret += messages[i].length + message;
}
return '\ufffdm\ufffd' + ret;
},
_decode: function(data){
var messages = [];
do(){
for (var i = 0, n, number = '';; i++;){
var n = data.substr(i, 1);
if (Number(n) != n){
number = Number(number);
break;
}
number += n;
}
messages.push(data.substr(i, i + number)); // here
data = data.substr(i + number);
} while(data !== '');
return messages;
},
_payload: function(){
var payload = [];
this.connections++;
this.connected = true;
if (!this.handshaked){
this._generateSessionId();
payload.push(JSON.stringify({
sessionid: this.sessionId
}));
payload.push(this.sessionId);
this.handshaked = true;
}
payload = payload.concat(this._writeQueue || []);
this._writeQueue = [];
if (payload.length) {
this._write(JSON.stringify({messages: payload}));
}
if (this.connections === 1) {
this.listener._onClientConnect(this);
}
if (payload.length) this._write(this._encode(payload));
if (this.connections === 1) this.listener._onClientConnect(this);
if (this.listener.options.timeout) this._heartbeat();
},
_heartbeat: function(){
var self = this;
this.send('\ufffdh\ufffd' + ++this._heartbeats);
this._heartbitTimeout = setTimeout(function(){
self.close();
}, this.listener.options.timeout);
},
_onHeartbeat: function(h){
if (h === this._heartbeats) clearTimeout(this._heartbitTimeout);
},
_onClose: function(){
var self = this;
if (this._heartbeatInterval) {
clearInterval(this._heartbeatInterval);
}
clearTimeout(this._heartbeatTimeout);
this.connected = false;
this._disconnectTimeout = setTimeout(function(){
self._onDisconnect();
}, this.options.closeTimeout);
},
_onDisconnect: function(){
_onDisconnect: function(){
if (!this.finalized){
this._writeQueue = [];
this.connected = false;
this.finalized = true;
if (this.handshaked) {
this.listener._onClientDisconnect(this);
}
if (this.handshaked) this.listener._onClientDisconnect(this);
}
},
_queue: function(message){
if (!('_writeQueue' in this)) {
if (!('_writeQueue' in this)){
this._writeQueue = [];
}
this._writeQueue.push(message);
@@ -113,9 +144,7 @@ exports.Client = Class({
},
_generateSessionId: function(){
if (this.sessionId) {
return this.listener.options.log('This client already has a session id');
}
if (this.sessionId) return this.listener.options.log('This client already has a session id');
this.sessionId = Math.random().toString().substr(2);
return this;
},

View File

@@ -16,13 +16,8 @@ exports.htmlfile = Client.extend({
'Connection': 'keep-alive',
'Transfer-Encoding': 'chunked'
});
this.response.write('<html><body>' + new Array(244).join(' '));
if ('flush' in this.response) this.response.flush();
this._payload();
this._heartbeatInterval = setInterval(function(){
self.response.write('<!-- heartbeat -->');
if ('flush' in self.response) self.response.flush();
}, this.options.heartbeatInterval);
break;
case 'POST':

View File

@@ -60,13 +60,7 @@ exports.websocket = Client.extend({
this.connection.addListener('end', function(){self._onClose();});
this.connection.addListener('data', function(data){self._handle(data);});
if (this._proveReception(headers)){
this._payload();
}
setInterval(function(){
self._write(JSON.stringify({heartbeat: '1'}));
}, 10000);
if (this._proveReception(headers)) this._payload();
},
_handle: function(data){

View File

@@ -36,9 +36,6 @@ exports['xhr-multipart'] = Client.extend({
this.response.write("--socketio\n");
if ('flush' in this.response) this.response.flush();
this._payload();
this._heartbeatInterval = setInterval(function(){
self._write(String.fromCharCode(6));
}, this.options.heartbeatInterval);
break;
case 'POST':