CHE-10365: Add websocket reconnection feature to workspace loader app (#10533)

* CHE-10365: Add websocket reconnection feature to workspace loader app

Signed-off-by: Oleksii Kurinnyi <okurinny@redhat.com>

* fixup! CHE-10365: Add websocket reconnection feature to workspace loader app
6.19.x
Oleksii Kurinnyi 2018-07-31 14:49:33 +03:00 committed by GitHub
parent 94a049a30b
commit 0e49d6c2b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 33 deletions

View File

@ -25,7 +25,9 @@
"webpack-dev-server": "^2.11.1",
"webpack-merge": "^4.1.1"
},
"dependencies": {},
"dependencies": {
"reconnecting-websocket": "3.2.2"
},
"jest": {
"moduleFileExtensions": [
"ts",

View File

@ -122,7 +122,7 @@ export class WorkspaceLoader {
startAfterStopping = false;
constructor(private readonly loader: Loader,
private readonly keycloak: any) {
private readonly keycloak?: any) {
/** Ask dashboard to show the IDE. */
window.parent.postMessage("show-ide", "*");
}
@ -279,9 +279,15 @@ export class WorkspaceLoader {
* Subscribes to the workspace events.
*/
subscribeWorkspaceEvents() : Promise<void> {
let master = new CheJsonRpcMasterApi(new WebsocketClient());
const websocketClient = new WebsocketClient();
websocketClient.addListener('open', () => {
this.getWorkspace(this.workspace.id).then((workspace) => {
this.onWorkspaceStatusChanged(workspace.status ? workspace.status : '');
});
});
const entryPoint = this.websocketBaseURL() + WEBSOCKET_CONTEXT + this.getAuthenticationToken();
const master = new CheJsonRpcMasterApi(websocketClient, entryPoint);
return new Promise((resolve) => {
const entryPoint = this.websocketBaseURL() + WEBSOCKET_CONTEXT + this.getAuthenticationToken();
master.connect(entryPoint).then(() => {
master.subscribeEnvironmentOutput(this.workspace.id,
(message: any) => this.onEnvironmentOutput(message.text));
@ -289,7 +295,7 @@ export class WorkspaceLoader {
master.subscribeWorkspaceStatus(this.workspace.id,
(message: any) => {
if (message.error) {
this.loader.error(message.error);
this.loader.error(message.error);
} else {
this.onWorkspaceStatusChanged(message.status);
}

View File

@ -11,7 +11,7 @@
*/
'use strict';
import {CheJsonRpcApiClient} from './che-json-rpc-api-service';
import {ICommunicationClient} from './json-rpc-client';
import { ICommunicationClient, CODE_REQUEST_TIMEOUT } from './json-rpc-client';
enum MasterChannels {
ENVIRONMENT_OUTPUT = <any>'machine/log',
@ -31,19 +31,90 @@ export class CheJsonRpcMasterApi {
private cheJsonRpcApi: CheJsonRpcApiClient;
private clientId: string;
constructor (client: ICommunicationClient) {
private checkingInterval: number;
private checkingDelay = 10000;
private fetchingClientIdTimeout = 5000;
private client: ICommunicationClient;
constructor(client: ICommunicationClient,
entryPoint: string) {
this.cheJsonRpcApi = new CheJsonRpcApiClient(client);
this.client = client;
client.addListener('open', () => this.onConnectionOpen());
client.addListener('close', (event: any) => {
switch (event.code) {
case 1000: // normal close
break;
default:
this.connect(entryPoint);
}
});
}
onConnectionOpen(): void {
if (this.checkingInterval) {
clearInterval(this.checkingInterval);
this.checkingInterval = undefined;
}
this.checkingInterval = setInterval(() => {
let isAlive = false;
const fetchClientPromise = new Promise((resolve) => {
this.fetchClientId().then(() => {
isAlive = true;
resolve(isAlive);
}, () => {
isAlive = false;
resolve(isAlive);
});
});
// this is timeout of fetchClientId request
const fetchClientTimeoutPromise = new Promise((resolve) => {
setTimeout(() => {
resolve(isAlive);
}, this.fetchingClientIdTimeout);
});
Promise.race([fetchClientPromise, fetchClientTimeoutPromise]).then((isAlive: boolean) => {
if (isAlive) {
return;
}
clearInterval(this.checkingInterval);
this.checkingInterval = undefined;
this.client.disconnect(CODE_REQUEST_TIMEOUT);
});
}, this.checkingDelay);
}
/**
* Opens connection to pointed entrypoint.
* Opens connection to pointed entryPoint.
*
* @param entrypoint
* @param {string} entryPoint
* @returns {IPromise<IHttpPromiseCallbackArg<any>>}
*/
connect(entrypoint: string): Promise<any> {
return this.cheJsonRpcApi.connect(entrypoint).then(() => {
connect(entryPoint: string): Promise<any> {
if (this.clientId) {
let clientId = `clientId=${this.clientId}`;
// in case of reconnection
// we need to test entrypoint on existing query parameters
// to add already gotten clientId
if (/\?/.test(entryPoint) === false) {
clientId = '?' + clientId;
} else {
clientId = '&' + clientId;
}
entryPoint += clientId;
}
return this.cheJsonRpcApi.connect(entryPoint).then(() => {
return this.fetchClientId();
}).catch((error: any) => {
console.error(`Failed to connect to ${entryPoint}:`, error);
});
}
@ -135,7 +206,7 @@ export class CheJsonRpcMasterApi {
}
/**
* Fetch client's id and strores it.
* Fetch client's id and stores it.
*
* @returns {IPromise<TResult>}
*/

View File

@ -13,15 +13,26 @@
import { IDeffered, Deffered } from './util';
const JSON_RPC_VERSION: string = '2.0';
export type CommunicationClientEvent = 'close' | 'error' | 'open' | 'message';
export const CODE_REQUEST_TIMEOUT = 4000;
/**
* Interface for communication between two entrypoints.
* The implementation can be through websocket or http protocol.
*/
export interface ICommunicationClient {
/**
* Process responses.
* Adds listener callbacks for specified client event.
* @param {CommunicationClientEvent} eventType an event type
* @param {Function} handler a callback function
*/
onResponse: Function;
addListener(eventType: CommunicationClientEvent, handler: Function): void;
/**
* Removes listener.
* @param {CommunicationClientEvent} eventType an event type
* @param {Function} handler a callback function
*/
removeListener(eventType: CommunicationClientEvent, handler: Function): void;
/**
* Performs connections.
*
@ -30,8 +41,9 @@ export interface ICommunicationClient {
connect(entrypoint: string): Promise<any>;
/**
* Close the connection.
* @param {number} code close code
*/
disconnect(): void;
disconnect(code?: number): void;
/**
* Send pointed data.
*
@ -78,9 +90,9 @@ export class JsonRpcClient {
this.pendingRequests = new Map<string, IDeffered<any>>();
this.notificationHandlers = new Map<string, Array<Function>>();
this.client.onResponse = (message: any): void => {
this.client.addListener("message", (message: any) => {
this.processResponse(message);
};
});
}
/**

View File

@ -10,7 +10,9 @@
* Red Hat, Inc. - initial API and implementation
*/
'use strict';
import { ICommunicationClient } from './json-rpc-client';
import { ICommunicationClient, CommunicationClientEvent } from './json-rpc-client';
import * as ReconnectingWebsocket from 'reconnecting-websocket';
const RWS = require('reconnecting-websocket');
/**
* The implementation for JSON RPC protocol communication through websocket.
@ -18,12 +20,8 @@ import { ICommunicationClient } from './json-rpc-client';
* @author Ann Shumilova
*/
export class WebsocketClient implements ICommunicationClient {
onResponse: Function;
private websocketStream: WebSocket;
constructor() {
}
private websocketStream: ReconnectingWebsocket;
private handlers: {[event: string]: Function[]} = {};
/**
* Performs connection to the pointed entrypoint.
@ -32,28 +30,66 @@ export class WebsocketClient implements ICommunicationClient {
*/
connect(entrypoint: string): Promise<void> {
return new Promise((resolve, reject) => {
this.websocketStream = new WebSocket(entrypoint);
this.websocketStream.addEventListener("open", () => {
this.websocketStream = new RWS(entrypoint, [], {});
this.websocketStream.addEventListener("open", (event: Event) => {
const eventType: CommunicationClientEvent = "open";
this.callHandlers(eventType, event);
resolve();
});
this.websocketStream.addEventListener("error", () => {
this.websocketStream.addEventListener("error", (event: Event) => {
const eventType: CommunicationClientEvent = "error";
this.callHandlers(eventType, event);
reject();
});
this.websocketStream.addEventListener("message", (message) => {
let data = JSON.parse(message.data);
this.onResponse(data);
this.websocketStream.addEventListener("message", (message: any) => {
const data = JSON.parse(message.data);
const eventType: CommunicationClientEvent = "message";
this.callHandlers(eventType, data);
});
this.websocketStream.addEventListener("close", (event: Event) => {
const eventType: CommunicationClientEvent = "close";
this.callHandlers(eventType, event);
});
});
}
/**
* Adds a listener on an event.
*
* @param {communicationClientEvent} event
* @param {Function} handler
*/
addListener(event: CommunicationClientEvent, handler: Function): void {
if (!this.handlers[event]) {
this.handlers[event] = [];
}
this.handlers[event].push(handler);
}
/**
* Removes a listener.
*
* @param {communicationClientEvent} eventType
* @param {Function} handler
*/
removeListener(eventType: CommunicationClientEvent, handler: Function): void {
if (!this.handlers[eventType] || !handler) {
return;
}
const index = this.handlers[eventType].indexOf(handler);
if (index === -1) {
return;
}
this.handlers[eventType].splice(index, 1);
}
/**
* Performs closing the connection.
* @param {number} code close code
*/
disconnect(): void {
disconnect(code?: number): void {
if (this.websocketStream) {
this.websocketStream.close();
this.websocketStream.close(code ? code : undefined);
}
}
@ -65,4 +101,10 @@ export class WebsocketClient implements ICommunicationClient {
send(data: any): void {
this.websocketStream.send(JSON.stringify(data));
}
private callHandlers(event: CommunicationClientEvent, data?: any): void {
if (this.handlers[event] && this.handlers[event].length > 0) {
this.handlers[event].forEach((handler: Function) => handler(data));
}
}
}