diff --git a/workspace-loader/package.json b/workspace-loader/package.json index 2e6ef48b40..fde3a8fc23 100644 --- a/workspace-loader/package.json +++ b/workspace-loader/package.json @@ -25,7 +25,9 @@ "webpack-dev-server": "^2.11.1", "webpack-merge": "^4.1.1" }, - "dependencies": {}, + "dependencies": { + "reconnecting-websocket": "3.2.2" + }, "jest": { "moduleFileExtensions": [ "ts", diff --git a/workspace-loader/src/index.ts b/workspace-loader/src/index.ts index 3fb656ac43..d01ff52788 100644 --- a/workspace-loader/src/index.ts +++ b/workspace-loader/src/index.ts @@ -122,7 +122,7 @@ export class WorkspaceLoader { startAfterStopping = false; constructor(private readonly loader: Loader, - private readonly keycloak: any) { + private readonly keycloak?: any) { /** Ask dashboard to show the IDE. */ window.parent.postMessage("show-ide", "*"); } @@ -279,9 +279,15 @@ export class WorkspaceLoader { * Subscribes to the workspace events. */ subscribeWorkspaceEvents() : Promise { - let master = new CheJsonRpcMasterApi(new WebsocketClient()); + const websocketClient = new WebsocketClient(); + websocketClient.addListener('open', () => { + this.getWorkspace(this.workspace.id).then((workspace) => { + this.onWorkspaceStatusChanged(workspace.status ? workspace.status : ''); + }); + }); + const entryPoint = this.websocketBaseURL() + WEBSOCKET_CONTEXT + this.getAuthenticationToken(); + const master = new CheJsonRpcMasterApi(websocketClient, entryPoint); return new Promise((resolve) => { - const entryPoint = this.websocketBaseURL() + WEBSOCKET_CONTEXT + this.getAuthenticationToken(); master.connect(entryPoint).then(() => { master.subscribeEnvironmentOutput(this.workspace.id, (message: any) => this.onEnvironmentOutput(message.text)); @@ -289,7 +295,7 @@ export class WorkspaceLoader { master.subscribeWorkspaceStatus(this.workspace.id, (message: any) => { if (message.error) { - this.loader.error(message.error); + this.loader.error(message.error); } else { this.onWorkspaceStatusChanged(message.status); } diff --git a/workspace-loader/src/json-rpc/che-json-rpc-master-api.ts b/workspace-loader/src/json-rpc/che-json-rpc-master-api.ts index cbe655838b..67fa3e2bfe 100644 --- a/workspace-loader/src/json-rpc/che-json-rpc-master-api.ts +++ b/workspace-loader/src/json-rpc/che-json-rpc-master-api.ts @@ -11,7 +11,7 @@ */ 'use strict'; import {CheJsonRpcApiClient} from './che-json-rpc-api-service'; -import {ICommunicationClient} from './json-rpc-client'; +import { ICommunicationClient, CODE_REQUEST_TIMEOUT } from './json-rpc-client'; enum MasterChannels { ENVIRONMENT_OUTPUT = 'machine/log', @@ -31,19 +31,90 @@ export class CheJsonRpcMasterApi { private cheJsonRpcApi: CheJsonRpcApiClient; private clientId: string; - constructor (client: ICommunicationClient) { + private checkingInterval: number; + private checkingDelay = 10000; + private fetchingClientIdTimeout = 5000; + + private client: ICommunicationClient; + + constructor(client: ICommunicationClient, + entryPoint: string) { this.cheJsonRpcApi = new CheJsonRpcApiClient(client); + this.client = client; + + client.addListener('open', () => this.onConnectionOpen()); + client.addListener('close', (event: any) => { + switch (event.code) { + case 1000: // normal close + break; + default: + this.connect(entryPoint); + } + }); + } + + onConnectionOpen(): void { + if (this.checkingInterval) { + clearInterval(this.checkingInterval); + this.checkingInterval = undefined; + } + + this.checkingInterval = setInterval(() => { + let isAlive = false; + const fetchClientPromise = new Promise((resolve) => { + this.fetchClientId().then(() => { + isAlive = true; + resolve(isAlive); + }, () => { + isAlive = false; + resolve(isAlive); + }); + }); + + // this is timeout of fetchClientId request + const fetchClientTimeoutPromise = new Promise((resolve) => { + setTimeout(() => { + resolve(isAlive); + }, this.fetchingClientIdTimeout); + }); + + Promise.race([fetchClientPromise, fetchClientTimeoutPromise]).then((isAlive: boolean) => { + if (isAlive) { + return; + } + + clearInterval(this.checkingInterval); + this.checkingInterval = undefined; + + this.client.disconnect(CODE_REQUEST_TIMEOUT); + }); + + }, this.checkingDelay); } /** - * Opens connection to pointed entrypoint. + * Opens connection to pointed entryPoint. * - * @param entrypoint + * @param {string} entryPoint * @returns {IPromise>} */ - connect(entrypoint: string): Promise { - return this.cheJsonRpcApi.connect(entrypoint).then(() => { + connect(entryPoint: string): Promise { + if (this.clientId) { + let clientId = `clientId=${this.clientId}`; + // in case of reconnection + // we need to test entrypoint on existing query parameters + // to add already gotten clientId + if (/\?/.test(entryPoint) === false) { + clientId = '?' + clientId; + } else { + clientId = '&' + clientId; + } + entryPoint += clientId; + } + return this.cheJsonRpcApi.connect(entryPoint).then(() => { return this.fetchClientId(); + }).catch((error: any) => { + console.error(`Failed to connect to ${entryPoint}:`, error); }); } @@ -135,7 +206,7 @@ export class CheJsonRpcMasterApi { } /** - * Fetch client's id and strores it. + * Fetch client's id and stores it. * * @returns {IPromise} */ diff --git a/workspace-loader/src/json-rpc/json-rpc-client.ts b/workspace-loader/src/json-rpc/json-rpc-client.ts index 8cf2f2271c..904b4e87b9 100644 --- a/workspace-loader/src/json-rpc/json-rpc-client.ts +++ b/workspace-loader/src/json-rpc/json-rpc-client.ts @@ -13,15 +13,26 @@ import { IDeffered, Deffered } from './util'; const JSON_RPC_VERSION: string = '2.0'; +export type CommunicationClientEvent = 'close' | 'error' | 'open' | 'message'; +export const CODE_REQUEST_TIMEOUT = 4000; + /** * Interface for communication between two entrypoints. * The implementation can be through websocket or http protocol. */ export interface ICommunicationClient { /** - * Process responses. + * Adds listener callbacks for specified client event. + * @param {CommunicationClientEvent} eventType an event type + * @param {Function} handler a callback function */ - onResponse: Function; + addListener(eventType: CommunicationClientEvent, handler: Function): void; + /** + * Removes listener. + * @param {CommunicationClientEvent} eventType an event type + * @param {Function} handler a callback function + */ + removeListener(eventType: CommunicationClientEvent, handler: Function): void; /** * Performs connections. * @@ -30,8 +41,9 @@ export interface ICommunicationClient { connect(entrypoint: string): Promise; /** * Close the connection. + * @param {number} code close code */ - disconnect(): void; + disconnect(code?: number): void; /** * Send pointed data. * @@ -78,9 +90,9 @@ export class JsonRpcClient { this.pendingRequests = new Map>(); this.notificationHandlers = new Map>(); - this.client.onResponse = (message: any): void => { + this.client.addListener("message", (message: any) => { this.processResponse(message); - }; + }); } /** diff --git a/workspace-loader/src/json-rpc/websocket-client.ts b/workspace-loader/src/json-rpc/websocket-client.ts index f390f80601..4f3730a4af 100644 --- a/workspace-loader/src/json-rpc/websocket-client.ts +++ b/workspace-loader/src/json-rpc/websocket-client.ts @@ -10,7 +10,9 @@ * Red Hat, Inc. - initial API and implementation */ 'use strict'; -import { ICommunicationClient } from './json-rpc-client'; +import { ICommunicationClient, CommunicationClientEvent } from './json-rpc-client'; +import * as ReconnectingWebsocket from 'reconnecting-websocket'; +const RWS = require('reconnecting-websocket'); /** * The implementation for JSON RPC protocol communication through websocket. @@ -18,12 +20,8 @@ import { ICommunicationClient } from './json-rpc-client'; * @author Ann Shumilova */ export class WebsocketClient implements ICommunicationClient { - onResponse: Function; - private websocketStream: WebSocket; - - constructor() { - - } + private websocketStream: ReconnectingWebsocket; + private handlers: {[event: string]: Function[]} = {}; /** * Performs connection to the pointed entrypoint. @@ -32,28 +30,66 @@ export class WebsocketClient implements ICommunicationClient { */ connect(entrypoint: string): Promise { return new Promise((resolve, reject) => { - this.websocketStream = new WebSocket(entrypoint); - this.websocketStream.addEventListener("open", () => { + this.websocketStream = new RWS(entrypoint, [], {}); + this.websocketStream.addEventListener("open", (event: Event) => { + const eventType: CommunicationClientEvent = "open"; + this.callHandlers(eventType, event); resolve(); }); - - this.websocketStream.addEventListener("error", () => { + this.websocketStream.addEventListener("error", (event: Event) => { + const eventType: CommunicationClientEvent = "error"; + this.callHandlers(eventType, event); reject(); }); - this.websocketStream.addEventListener("message", (message) => { - let data = JSON.parse(message.data); - this.onResponse(data); + this.websocketStream.addEventListener("message", (message: any) => { + const data = JSON.parse(message.data); + const eventType: CommunicationClientEvent = "message"; + this.callHandlers(eventType, data); + }); + this.websocketStream.addEventListener("close", (event: Event) => { + const eventType: CommunicationClientEvent = "close"; + this.callHandlers(eventType, event); }); }); + } + /** + * Adds a listener on an event. + * + * @param {communicationClientEvent} event + * @param {Function} handler + */ + addListener(event: CommunicationClientEvent, handler: Function): void { + if (!this.handlers[event]) { + this.handlers[event] = []; + } + this.handlers[event].push(handler); + } + + /** + * Removes a listener. + * + * @param {communicationClientEvent} eventType + * @param {Function} handler + */ + removeListener(eventType: CommunicationClientEvent, handler: Function): void { + if (!this.handlers[eventType] || !handler) { + return; + } + const index = this.handlers[eventType].indexOf(handler); + if (index === -1) { + return; + } + this.handlers[eventType].splice(index, 1); } /** * Performs closing the connection. + * @param {number} code close code */ - disconnect(): void { + disconnect(code?: number): void { if (this.websocketStream) { - this.websocketStream.close(); + this.websocketStream.close(code ? code : undefined); } } @@ -65,4 +101,10 @@ export class WebsocketClient implements ICommunicationClient { send(data: any): void { this.websocketStream.send(JSON.stringify(data)); } + + private callHandlers(event: CommunicationClientEvent, data?: any): void { + if (this.handlers[event] && this.handlers[event].length > 0) { + this.handlers[event].forEach((handler: Function) => handler(data)); + } + } }