Broadcast project import output through json rpc protocol (#4888)

* Broadcast project import output through json rpc protocol

Signed-off-by: Vladyslav Zhukovskii <vzhukovskii@codenvy.com>

* Remove redundant text

Signed-off-by: Vladyslav Zhukovskii <vzhukovskii@codenvy.com>

* Code refactoring

* Code refactoring

* Name convention

* New line

* Refactor method body

* Add log call

* Extract hardcoded strings to the constants
6.19.x
Vladyslav Zhukovskii 2017-06-20 12:22:53 +03:00 committed by GitHub
parent c34d359921
commit b95d74c5bb
21 changed files with 803 additions and 114 deletions

View File

@ -134,6 +134,45 @@ public class RequestHandlerManager {
return methodToCategory.containsKey(method);
}
public synchronized boolean deregister(String method) {
Category category = methodToCategory.remove(method);
if (category == null) {
return false;
}
switch (category) {
case ONE_TO_ONE:
oneToOneHandlers.remove(method);
break;
case ONE_TO_MANY:
oneToManyHandlers.remove(method);
break;
case ONE_TO_NONE:
oneToNoneHandlers.remove(method);
break;
case MANY_TO_ONE:
manyToOneHandlers.remove(method);
break;
case MANY_TO_MANY:
manyToManyHandlers.remove(method);
break;
case MANY_TO_NONE:
manyToNoneHandlers.remove(method);
break;
case NONE_TO_ONE:
noneToOneHandlers.remove(method);
break;
case NONE_TO_MANY:
noneToManyHandlers.remove(method);
break;
case NONE_TO_NONE:
noneToNoneHandlers.remove(method);
}
return true;
}
public void handle(String endpointId, String requestId, String method, JsonRpcParams params) {
mustBeRegistered(method);

View File

@ -23,7 +23,7 @@ import org.eclipse.che.ide.api.project.wizard.ImportWizardRegistry;
import org.eclipse.che.ide.api.project.wizard.ProjectNotificationSubscriber;
import org.eclipse.che.ide.projectimport.wizard.ImportWizardFactory;
import org.eclipse.che.ide.projectimport.wizard.ImportWizardRegistryImpl;
import org.eclipse.che.ide.projectimport.wizard.ProjectNotificationSubscriberImpl;
import org.eclipse.che.ide.projectimport.wizard.ProjectImportOutputJsonRpcNotifier;
import org.eclipse.che.ide.projectimport.zip.ZipImportWizardRegistrar;
/**
@ -43,9 +43,8 @@ public class ProjectImportModule extends AbstractGinModule {
install(new GinFactoryModuleBuilder().build(ImportWizardFactory.class));
bind(ProjectNotificationSubscriber.class).to(ProjectNotificationSubscriberImpl.class).in(Singleton.class);
install(new GinFactoryModuleBuilder()
.implement(ProjectNotificationSubscriber.class, ProjectNotificationSubscriberImpl.class)
.implement(ProjectNotificationSubscriber.class, ProjectImportOutputJsonRpcNotifier.class)
.build(ImportProjectNotificationSubscriberFactory.class));
}
}

View File

@ -0,0 +1,105 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.ide.projectimport.wizard;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.web.bindery.event.shared.EventBus;
import org.eclipse.che.ide.CoreLocalizationConstant;
import org.eclipse.che.ide.api.machine.events.WsAgentStateEvent;
import org.eclipse.che.ide.api.machine.events.WsAgentStateHandler;
import org.eclipse.che.ide.api.notification.NotificationManager;
import org.eclipse.che.ide.api.notification.StatusNotification;
import org.eclipse.che.ide.api.project.wizard.ProjectNotificationSubscriber;
import static com.google.common.base.Strings.nullToEmpty;
import static org.eclipse.che.ide.api.notification.StatusNotification.DisplayMode.FLOAT_MODE;
import static org.eclipse.che.ide.api.notification.StatusNotification.Status.FAIL;
import static org.eclipse.che.ide.api.notification.StatusNotification.Status.PROGRESS;
import static org.eclipse.che.ide.api.notification.StatusNotification.Status.SUCCESS;
/**
* Json RPC based implementation of the {@link ProjectNotificationSubscriber} which notifies user
* about output events via popup notification.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
@Singleton
public class ProjectImportOutputJsonRpcNotifier implements ProjectNotificationSubscriber {
private final NotificationManager notificationManager;
private final ProjectImportOutputJsonRpcSubscriber subscriber;
private final CoreLocalizationConstant locale;
private StatusNotification singletonNotification;
private String projectName;
@Inject
public ProjectImportOutputJsonRpcNotifier(NotificationManager notificationManager,
ProjectImportOutputJsonRpcSubscriber subscriber,
CoreLocalizationConstant locale,
EventBus eventBus) {
this.notificationManager = notificationManager;
this.subscriber = subscriber;
this.locale = locale;
eventBus.addHandler(WsAgentStateEvent.TYPE, new WsAgentStateHandler() {
@Override
public void onWsAgentStarted(WsAgentStateEvent event) {
}
@Override
public void onWsAgentStopped(WsAgentStateEvent event) {
subscriber.unSubscribeForImportOutputEvents();
singletonNotification.setStatus(FAIL);
singletonNotification.setContent("");
}
});
}
@Override
public void subscribe(String projectName, StatusNotification notification) {
this.projectName = projectName;
this.singletonNotification = notification;
subscriber.subscribeForImportOutputEvents(progressRecord -> {
ProjectImportOutputJsonRpcNotifier.this.projectName = nullToEmpty(progressRecord.getProjectName());
singletonNotification.setTitle(locale.importingProject(ProjectImportOutputJsonRpcNotifier.this.projectName));
singletonNotification.setContent(nullToEmpty(progressRecord.getLine()));
});
}
@Override
public void subscribe(String projectName) {
singletonNotification = notificationManager.notify(locale.importingProject(projectName), PROGRESS, FLOAT_MODE);
subscribe(projectName, singletonNotification);
}
@Override
public void onSuccess() {
subscriber.unSubscribeForImportOutputEvents();
singletonNotification.setStatus(SUCCESS);
singletonNotification.setTitle(locale.importProjectMessageSuccess(projectName));
singletonNotification.setContent("");
}
@Override
public void onFailure(String errorMessage) {
subscriber.unSubscribeForImportOutputEvents();
singletonNotification.setStatus(FAIL);
singletonNotification.setContent(errorMessage);
}
}

View File

@ -0,0 +1,67 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.ide.projectimport.wizard;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerManager;
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
import org.eclipse.che.api.project.shared.ImportProgressRecord;
import org.eclipse.che.api.project.shared.dto.ImportProgressRecordDto;
import java.util.function.Consumer;
import static org.eclipse.che.api.project.shared.Constants.EVENT_IMPORT_OUTPUT_PROGRESS;
import static org.eclipse.che.api.project.shared.Constants.EVENT_IMPORT_OUTPUT_SUBSCRIBE;
import static org.eclipse.che.api.project.shared.Constants.EVENT_IMPORT_OUTPUT_UN_SUBSCRIBE;
/**
* Json RPC subscriber for listening to the project import events. Register itself for the listening events from the server side.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
@Singleton
public class ProjectImportOutputJsonRpcSubscriber {
public static final String WS_AGENT_ENDPOINT = "ws-agent";
private final RequestTransmitter transmitter;
private final RequestHandlerConfigurator configurator;
private final RequestHandlerManager requestHandlerManager;
@Inject
public ProjectImportOutputJsonRpcSubscriber(RequestTransmitter transmitter,
RequestHandlerConfigurator configurator,
RequestHandlerManager requestHandlerManager) {
this.transmitter = transmitter;
this.configurator = configurator;
this.requestHandlerManager = requestHandlerManager;
}
protected void subscribeForImportOutputEvents(Consumer<ImportProgressRecord> progressConsumer) {
transmitter.newRequest().endpointId(WS_AGENT_ENDPOINT).methodName(EVENT_IMPORT_OUTPUT_SUBSCRIBE).noParams().sendAndSkipResult();
configurator.newConfiguration()
.methodName(EVENT_IMPORT_OUTPUT_PROGRESS)
.paramsAsDto(ImportProgressRecordDto.class)
.noResult()
.withConsumer(progress -> progressConsumer.accept(progress));
}
protected void unSubscribeForImportOutputEvents() {
transmitter.newRequest().endpointId(WS_AGENT_ENDPOINT).methodName(EVENT_IMPORT_OUTPUT_UN_SUBSCRIBE).noParams().sendAndSkipResult();
requestHandlerManager.deregister(EVENT_IMPORT_OUTPUT_PROGRESS);
}
}

View File

@ -40,7 +40,9 @@ import static org.eclipse.che.ide.api.notification.StatusNotification.Status.SUC
* It can be produced by {@code ImportProjectNotificationSubscriberFactory}
*
* @author Anton Korneta
* @deprecated this class is going to be removed soon
*/
@Deprecated
@Singleton
public class ProjectNotificationSubscriberImpl implements ProjectNotificationSubscriber {

View File

@ -0,0 +1,134 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.ide.projectimport.wizard;
import com.google.gwtmockito.GwtMockitoTestRunner;
import com.google.web.bindery.event.shared.EventBus;
import org.eclipse.che.api.project.shared.ImportProgressRecord;
import org.eclipse.che.ide.CoreLocalizationConstant;
import org.eclipse.che.ide.api.notification.NotificationManager;
import org.eclipse.che.ide.api.notification.StatusNotification;
import org.eclipse.che.ide.api.notification.StatusNotification.DisplayMode;
import org.eclipse.che.ide.api.notification.StatusNotification.Status;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import java.util.function.Consumer;
import static org.eclipse.che.ide.api.notification.StatusNotification.Status.FAIL;
import static org.eclipse.che.ide.api.notification.StatusNotification.Status.SUCCESS;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link ProjectImportOutputJsonRpcNotifier}.
*
* @author Vlad Zhukovskyi
*/
@RunWith(GwtMockitoTestRunner.class)
public class ProjectImportOutputJsonRpcNotifierTest {
@Mock
NotificationManager notificationManager;
@Mock
ProjectImportOutputJsonRpcSubscriber subscriber;
@Mock
CoreLocalizationConstant constant;
@Mock
EventBus eventBus;
private ProjectImportOutputJsonRpcNotifier notifier;
@Before
public void setUp() throws Exception {
notifier = new ProjectImportOutputJsonRpcNotifier(notificationManager, subscriber, constant, eventBus);
}
@Test
public void testShouldSubscribeForDisplayingNotification() throws Exception {
//given
final ImportProgressRecord dto = new ImportProgressRecord() {
@Override
public int getNum() {
return 1;
}
@Override
public String getLine() {
return "message";
}
@Override
public String getProjectName() {
return "project";
}
};
final ArgumentCaptor<Consumer> argumentCaptor = ArgumentCaptor.forClass(Consumer.class);
final StatusNotification statusNotification = mock(StatusNotification.class);
when(notificationManager.notify(anyString(), any(Status.class), any(DisplayMode.class))).thenReturn(statusNotification);
when(constant.importingProject(anyString())).thenReturn("message");
//when
notifier.subscribe("project");
//then
verify(constant).importingProject(eq("project"));
verify(subscriber).subscribeForImportOutputEvents(argumentCaptor.capture());
argumentCaptor.getValue().accept(dto);
verify(statusNotification).setTitle(eq("message"));
verify(statusNotification).setContent(eq(dto.getLine()));
}
@Test
public void testShouldUnSubscribeFromDisplayingNotification() throws Exception {
//given
when(constant.importProjectMessageSuccess(anyString())).thenReturn("message");
final StatusNotification statusNotification = mock(StatusNotification.class);
when(notificationManager.notify(anyString(), any(Status.class), any(DisplayMode.class))).thenReturn(statusNotification);
//when
notifier.subscribe("project");
notifier.onSuccess();
//then
verify(subscriber).unSubscribeForImportOutputEvents();
verify(statusNotification).setStatus(eq(SUCCESS));
verify(statusNotification).setTitle(eq("message"));
verify(statusNotification).setContent(eq(""));
}
@Test
public void testShouldUnSubscribeFromDisplayingNotificationIfExceptionOccurred() throws Exception {
//given
final StatusNotification statusNotification = mock(StatusNotification.class);
when(notificationManager.notify(anyString(), any(Status.class), any(DisplayMode.class))).thenReturn(statusNotification);
//when
notifier.subscribe("project");
notifier.onFailure("message");
//then
verify(subscriber).unSubscribeForImportOutputEvents();
verify(statusNotification).setStatus(eq(FAIL));
verify(statusNotification).setContent(eq("message"));
}
}

View File

@ -37,6 +37,10 @@ public class Constants {
public static final String COMMANDS_ATTRIBUTE_NAME = "commands";
public static final String COMMANDS_ATTRIBUTE_DESCRIPTION = "Project-related commands";
public static final String EVENT_IMPORT_OUTPUT_SUBSCRIBE = "importProject/subscribe";
public static final String EVENT_IMPORT_OUTPUT_UN_SUBSCRIBE = "importProject/unSubscribe";
public static final String EVENT_IMPORT_OUTPUT_PROGRESS = "importProject/progress";
private Constants() {
}
}

View File

@ -0,0 +1,41 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.shared;
/**
* Progress record that holds output information about the importing status.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
public interface ImportProgressRecord {
/**
* Record line number.
*
* @return record line number
*/
int getNum();
/**
* Record line.
*
* @return record line
*/
String getLine();
/**
* Return project name.
*
* @return project name
*/
String getProjectName();
}

View File

@ -0,0 +1,35 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.shared.dto;
import org.eclipse.che.api.project.shared.ImportProgressRecord;
import org.eclipse.che.dto.shared.DTO;
/**
* DTO of {@link ImportProgressRecord}.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
@DTO
public interface ImportProgressRecordDto extends ImportProgressRecord {
void setNum(int num);
ImportProgressRecordDto withNum(int num);
void setLine(String line);
ImportProgressRecordDto withLine(String line);
void setProjectName(String projectName);
ImportProgressRecordDto withProjectName(String projectName);
}

View File

@ -19,6 +19,7 @@ import com.google.inject.name.Names;
import org.eclipse.che.api.project.server.handlers.CreateBaseProjectTypeHandler;
import org.eclipse.che.api.project.server.handlers.ProjectHandler;
import org.eclipse.che.api.project.server.importer.ProjectImportOutputJsonRpcRegistrar;
import org.eclipse.che.api.project.server.importer.ProjectImporter;
import org.eclipse.che.api.project.server.importer.ProjectImportersService;
import org.eclipse.che.api.project.server.type.BaseProjectType;
@ -75,6 +76,7 @@ public class ProjectApiModule extends AbstractModule {
bind(ProjectService.class);
bind(ProjectTypeService.class);
bind(ProjectImportersService.class);
bind(ProjectImportOutputJsonRpcRegistrar.class);
bind(WorkspaceProjectsSyncer.class).to(WorkspaceHolder.class);

View File

@ -44,6 +44,6 @@ public class ProjectOutputLineConsumerFactory implements LineConsumerFactory {
@Override
public LineConsumer newLineConsumer() {
return new ProjectImportOutputWSLineConsumer(projectName, workspaceId, delay);
return new ProjectImportOutputWSLineConsumer(projectName, delay);
}
}

View File

@ -25,11 +25,15 @@ import org.eclipse.che.api.core.ForbiddenException;
import org.eclipse.che.api.core.NotFoundException;
import org.eclipse.che.api.core.ServerException;
import org.eclipse.che.api.core.UnauthorizedException;
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
import org.eclipse.che.api.core.model.project.type.Value;
import org.eclipse.che.api.core.notification.EventService;
import org.eclipse.che.api.core.rest.Service;
import org.eclipse.che.api.core.rest.annotations.Description;
import org.eclipse.che.api.core.rest.annotations.GenerateLink;
import org.eclipse.che.api.core.util.CompositeLineConsumer;
import org.eclipse.che.api.project.server.importer.ProjectImportOutputJsonRpcLineConsumer;
import org.eclipse.che.api.project.server.importer.ProjectImportOutputJsonRpcRegistrar;
import org.eclipse.che.api.project.server.importer.ProjectImportOutputWSLineConsumer;
import org.eclipse.che.api.project.server.notification.ProjectItemModifiedEvent;
import org.eclipse.che.api.project.server.type.ProjectTypeResolution;
@ -105,18 +109,24 @@ public class ProjectService extends Service {
private static final Logger LOG = LoggerFactory.getLogger(ProjectService.class);
private static final Tika TIKA = new Tika();
private final ProjectManager projectManager;
private final EventService eventService;
private final ProjectServiceLinksInjector projectServiceLinksInjector;
private final String workspace;
private final ProjectManager projectManager;
private final EventService eventService;
private final ProjectServiceLinksInjector projectServiceLinksInjector;
private final RequestTransmitter transmitter;
private final ProjectImportOutputJsonRpcRegistrar projectImportHandlerRegistrar;
private final String workspace;
@Inject
public ProjectService(ProjectManager projectManager,
EventService eventService,
ProjectServiceLinksInjector projectServiceLinksInjector) {
ProjectServiceLinksInjector projectServiceLinksInjector,
RequestTransmitter transmitter,
ProjectImportOutputJsonRpcRegistrar projectImportHandlerRegistrar) {
this.projectManager = projectManager;
this.eventService = eventService;
this.projectServiceLinksInjector = projectServiceLinksInjector;
this.transmitter = transmitter;
this.projectImportHandlerRegistrar = projectImportHandlerRegistrar;
this.workspace = WorkspaceIdProvider.getWorkspaceId();
}
@ -351,8 +361,16 @@ public class ProjectService extends Service {
ServerException,
NotFoundException,
BadRequestException {
projectManager.importProject(path, sourceStorage, force,
() -> new ProjectImportOutputWSLineConsumer(path, workspace, 300));
final int delayBetweenMessages = 300;
final ProjectImportOutputWSLineConsumer wsLineConsumer =
new ProjectImportOutputWSLineConsumer(path, delayBetweenMessages);
final ProjectImportOutputJsonRpcLineConsumer rpcLineConsumer =
new ProjectImportOutputJsonRpcLineConsumer(path, transmitter, projectImportHandlerRegistrar, delayBetweenMessages);
projectManager.importProject(path, sourceStorage, force, () -> new CompositeLineConsumer(wsLineConsumer, rpcLineConsumer));
}
@POST

View File

@ -0,0 +1,87 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.che.api.core.util.LineConsumer;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Base abstraction for consuming project importing output events.
* <p>
* Consumes output lines into delayed queue and perform broadcasting the last ones through each type of implementation.
* There are only two implementation of broadcasting type represented by {@link ProjectImportOutputWSLineConsumer} which
* broadcasts events through the web socket and {@link ProjectImportOutputJsonRpcLineConsumer} which broadcasts events
* through the json rpc protocol.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
public abstract class BaseProjectImportOutputLineConsumer implements LineConsumer {
private static final Logger LOG = LoggerFactory.getLogger(BaseProjectImportOutputLineConsumer.class);
protected final String projectName;
protected final BlockingQueue<String> lineToSendQueue;
protected final ScheduledExecutorService executor;
public BaseProjectImportOutputLineConsumer(String projectName, int delayBetweenMessages) {
this.projectName = projectName;
lineToSendQueue = new ArrayBlockingQueue<>(1024);
executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(BaseProjectImportOutputLineConsumer.class.getSimpleName() + "-%d")
.setDaemon(true)
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.build());
executor.scheduleAtFixedRate(() -> {
String lineToSend = null;
while (!lineToSendQueue.isEmpty()) {
lineToSend = lineToSendQueue.poll();
}
if (lineToSend == null) {
return;
}
sendOutputLine(lineToSend);
}, 0, delayBetweenMessages, TimeUnit.MILLISECONDS);
}
@Override
public void close() throws IOException {
executor.shutdown();
}
@Override
public void writeLine(String line) throws IOException {
try {
lineToSendQueue.put(line);
} catch (InterruptedException exception) {
LOG.info(exception.getLocalizedMessage());
}
}
/**
* Perform sending the given {@code outputLine} through the specific algorithm.
*
* @param outputLine
* output line message
*/
protected abstract void sendOutputLine(String outputLine);
}

View File

@ -0,0 +1,53 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
import org.eclipse.che.api.project.shared.dto.ImportProgressRecordDto;
import java.util.concurrent.atomic.AtomicInteger;
import static org.eclipse.che.api.project.shared.Constants.EVENT_IMPORT_OUTPUT_PROGRESS;
import static org.eclipse.che.dto.server.DtoFactory.newDto;
/**
* Importer output line consumer that perform broadcasting consumed output through the json rpc protocol to the specific method.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
public class ProjectImportOutputJsonRpcLineConsumer extends BaseProjectImportOutputLineConsumer {
private final AtomicInteger lineCounter;
private final RequestTransmitter transmitter;
private final ProjectImportOutputJsonRpcRegistrar endpointIdRegistrar;
public ProjectImportOutputJsonRpcLineConsumer(String projectName,
RequestTransmitter transmitter,
ProjectImportOutputJsonRpcRegistrar endpointIdRegistrar,
int delayBetweenMessages) {
super(projectName, delayBetweenMessages);
this.transmitter = transmitter;
this.endpointIdRegistrar = endpointIdRegistrar;
lineCounter = new AtomicInteger(1);
}
@Override
protected void sendOutputLine(String outputLine) {
final ImportProgressRecordDto progressRecord = newDto(ImportProgressRecordDto.class).withNum(lineCounter.getAndIncrement())
.withLine(outputLine)
.withProjectName(projectName);
endpointIdRegistrar.getRegisteredEndpoints()
.forEach(it -> transmitter.newRequest().endpointId(it).methodName(EVENT_IMPORT_OUTPUT_PROGRESS).paramsAsDto(progressRecord).sendAndSkipResult());
}
}

View File

@ -0,0 +1,56 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import org.eclipse.che.api.core.jsonrpc.commons.RequestHandlerConfigurator;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Set;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static org.eclipse.che.api.project.shared.Constants.EVENT_IMPORT_OUTPUT_SUBSCRIBE;
import static org.eclipse.che.api.project.shared.Constants.EVENT_IMPORT_OUTPUT_UN_SUBSCRIBE;
/**
* Endpoint registry for broadcasting project import events. Holds registered client's endpoint ids.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
@Singleton
public class ProjectImportOutputJsonRpcRegistrar {
private final Set<String> endpointIds = newConcurrentHashSet();
@Inject
private void configureSubscribeHandler(RequestHandlerConfigurator configurator) {
configurator.newConfiguration()
.methodName(EVENT_IMPORT_OUTPUT_SUBSCRIBE)
.noParams()
.noResult()
.withConsumer(endpointId -> endpointIds.add(endpointId));
}
@Inject
private void configureUnSubscribeHandler(RequestHandlerConfigurator configurator) {
configurator.newConfiguration()
.methodName(EVENT_IMPORT_OUTPUT_UN_SUBSCRIBE)
.noParams()
.noResult()
.withConsumer(endpointId -> endpointIds.remove(endpointId));
}
public Set<String> getRegisteredEndpoints() {
return newConcurrentHashSet(endpointIds);
}
}

View File

@ -10,88 +10,60 @@
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.eclipse.che.api.core.util.LineConsumer;
import org.eclipse.che.commons.lang.concurrent.LoggingUncaughtExceptionHandler;
import org.everrest.websockets.WSConnectionContext;
import org.everrest.websockets.message.ChannelBroadcastMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Send project import output to WS by skipping output messages written below the delay specified.
* Importer output line consumer that perform broadcasting consumed output through the json rpc protocol to the specific method.
*
* @author Vlad Zhukovskyi
* @since 5.9.0
*/
public class ProjectImportOutputWSLineConsumer implements LineConsumer {
public class ProjectImportOutputWSLineConsumer extends BaseProjectImportOutputLineConsumer {
public static final String IMPORT_OUTPUT_CHANNEL = "importProject:output";
private static final Logger LOG = LoggerFactory.getLogger(ProjectImportOutputWSLineConsumer.class);
protected final AtomicInteger lineCounter;
protected final String projectName;
protected final String workspaceId;
protected final BlockingQueue<String> lineToSendQueue;
protected final ScheduledExecutorService executor;
private final AtomicInteger lineCounter;
public ProjectImportOutputWSLineConsumer(String projectName, int delayBetweenMessages) {
super(projectName, delayBetweenMessages);
public ProjectImportOutputWSLineConsumer(String projectName, String workspaceId, int delayBetweenMessages) {
this.projectName = projectName;
this.workspaceId = workspaceId;
lineToSendQueue = new ArrayBlockingQueue<>(1024);
executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(ProjectImportOutputWSLineConsumer.class.getSimpleName() + "-%d")
.setDaemon(true)
.setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.getInstance())
.build());
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
String lineToSend = null;
while (!lineToSendQueue.isEmpty()) {
lineToSend = lineToSendQueue.poll();
}
if (lineToSend == null) {
return;
}
sendMessage(lineToSend);
}
}, 0, delayBetweenMessages, TimeUnit.MILLISECONDS);
lineCounter = new AtomicInteger(1);
}
@Override
public void close() throws IOException {
executor.shutdown();
protected void sendOutputLine(String outputLine) {
sendMessage(outputLine);
}
@Override
public void writeLine(String line) throws IOException {
protected void sendMessage(String outputLine) {
doSendMessage(IMPORT_OUTPUT_CHANNEL, createMessageObject(outputLine));
}
protected JsonObject createMessageObject(String message) {
JsonObject jso = new JsonObject();
jso.addProperty("num", lineCounter.getAndIncrement());
jso.addProperty("line", message);
jso.addProperty("project", projectName);
return jso;
}
protected void doSendMessage(String channelId, JsonElement messageBody) {
try {
lineToSendQueue.put(line);
} catch (InterruptedException ignored) {
// ignore if interrupted
}
}
final ChannelBroadcastMessage bm = new ChannelBroadcastMessage();
bm.setChannel(channelId);
bm.setBody(messageBody.toString());
protected void sendMessage(String line) {
final ChannelBroadcastMessage bm = new ChannelBroadcastMessage();
bm.setChannel("importProject:output");
JsonObject json = new JsonObject();
json.addProperty("num", lineCounter.getAndIncrement());
json.addProperty("line", line);
json.addProperty("project", projectName);
bm.setBody(json.toString());
sendMessageToWS(bm);
}
protected void sendMessageToWS(final ChannelBroadcastMessage bm) {
try {
WSConnectionContext.sendMessage(bm);
} catch (Exception e) {
LOG.error("A problem occurred while sending websocket message", e);

View File

@ -838,7 +838,7 @@ public class ProjectManagerWriteTest extends WsAgentTestBase {
SourceStorage sourceConfig = DtoFactory.newDto(SourceStorageDto.class).withType(importType);
pm.importProject("/testImportProject", sourceConfig, false, () -> new ProjectImportOutputWSLineConsumer("BATCH", "ws", 300));
pm.importProject("/testImportProject", sourceConfig, false, () -> new ProjectImportOutputWSLineConsumer("BATCH", 300));
RegisteredProject project = projectRegistry.getProject("/testImportProject");
@ -860,7 +860,7 @@ public class ProjectManagerWriteTest extends WsAgentTestBase {
SourceStorage sourceConfig = DtoFactory.newDto(SourceStorageDto.class).withType(importType);
try {
pm.importProject(projectPath, sourceConfig, false, () -> new ProjectImportOutputWSLineConsumer("testImportProject", "ws", 300));
pm.importProject(projectPath, sourceConfig, false, () -> new ProjectImportOutputWSLineConsumer("testImportProject", 300));
} catch (Exception e) {
}
@ -873,7 +873,7 @@ public class ProjectManagerWriteTest extends WsAgentTestBase {
SourceStorage sourceConfig = DtoFactory.newDto(SourceStorageDto.class).withType("nothing");
try {
pm.importProject("/testImportProject", sourceConfig, false, () -> new ProjectImportOutputWSLineConsumer("testImportProject", "ws", 300));
pm.importProject("/testImportProject", sourceConfig, false, () -> new ProjectImportOutputWSLineConsumer("testImportProject", 300));
fail("NotFoundException: Unable import sources project from 'null'. Sources type 'nothing' is not supported.");
} catch (NotFoundException e) {
}

View File

@ -16,6 +16,7 @@ import org.eclipse.che.api.core.ConflictException;
import org.eclipse.che.api.core.ForbiddenException;
import org.eclipse.che.api.core.NotFoundException;
import org.eclipse.che.api.core.ServerException;
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
import org.eclipse.che.api.core.model.project.SourceStorage;
import org.eclipse.che.api.core.model.project.type.Attribute;
import org.eclipse.che.api.core.notification.EventService;
@ -28,6 +29,7 @@ import org.eclipse.che.api.core.util.LineConsumerFactory;
import org.eclipse.che.api.core.util.ValueHolder;
import org.eclipse.che.api.project.server.handlers.CreateProjectHandler;
import org.eclipse.che.api.project.server.handlers.ProjectHandlerRegistry;
import org.eclipse.che.api.project.server.importer.ProjectImportOutputJsonRpcRegistrar;
import org.eclipse.che.api.project.server.importer.ProjectImporter;
import org.eclipse.che.api.project.server.importer.ProjectImporterRegistry;
import org.eclipse.che.api.project.server.type.AttributeValue;
@ -280,6 +282,8 @@ public class ProjectServiceTest {
dependencies.addInstance(ProjectHandlerRegistry.class, phRegistry);
dependencies.addInstance(EventService.class, eventService);
dependencies.addInstance(ProjectServiceLinksInjector.class, projectServiceLinksInjector);
dependencies.addInstance(RequestTransmitter.class, mock(RequestTransmitter.class));
dependencies.addInstance(ProjectImportOutputJsonRpcRegistrar.class, new ProjectImportOutputJsonRpcRegistrar());
ResourceBinder resources = new ResourceBinderImpl();
ProviderBinder providers = ProviderBinder.getInstance();

View File

@ -0,0 +1,36 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.testng.Assert.assertEquals;
/**
* Unit tests for the {@link BaseProjectImportOutputLineConsumer}.
*
* @author Vlad Zhukovskyi
*/
public class BaseProjectImportOutputLineConsumerTest {
@Test
public void shouldSendOutputLine() throws IOException {
new BaseProjectImportOutputLineConsumer("project", 100) {
@Override
protected void sendOutputLine(String outputLine) {
assertEquals(outputLine, "message");
}
}.sendOutputLine("message");
}
}

View File

@ -0,0 +1,75 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import org.eclipse.che.api.core.jsonrpc.commons.RequestTransmitter;
import org.eclipse.che.api.core.jsonrpc.commons.transmission.EndpointIdConfigurator;
import org.eclipse.che.api.core.jsonrpc.commons.transmission.MethodNameConfigurator;
import org.eclipse.che.api.core.jsonrpc.commons.transmission.ParamsConfigurator;
import org.eclipse.che.api.core.jsonrpc.commons.transmission.SendConfiguratorFromOne;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.Collections;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit test for the {@link ProjectImportOutputJsonRpcLineConsumer}.
*
* @author Vlad Zhukovskyi
*/
@RunWith(MockitoJUnitRunner.class)
public class ProjectImportOutputJsonRpcLineConsumerTest {
@Mock
RequestTransmitter requestTransmitter;
@Mock
ProjectImportOutputJsonRpcRegistrar registrar;
private ProjectImportOutputJsonRpcLineConsumer consumer;
@Before
public void setUp() throws Exception {
consumer = new ProjectImportOutputJsonRpcLineConsumer("project", requestTransmitter, registrar, 100);
}
@Test
public void testShouldSendOutputLineThroughJsonRpcToEndpoint() throws Exception {
//given
when(registrar.getRegisteredEndpoints()).thenReturn(Collections.singleton("endpointId"));
final EndpointIdConfigurator endpointIdConfigurator = mock(EndpointIdConfigurator.class);
when(requestTransmitter.newRequest()).thenReturn(endpointIdConfigurator);
final MethodNameConfigurator methodNameConfigurator = mock(MethodNameConfigurator.class);
when(endpointIdConfigurator.endpointId(anyString())).thenReturn(methodNameConfigurator);
final ParamsConfigurator paramsConfigurator = mock(ParamsConfigurator.class);
when(methodNameConfigurator.methodName(anyString())).thenReturn(paramsConfigurator);
final SendConfiguratorFromOne sendConfiguratorFromOne = mock(SendConfiguratorFromOne.class);
when(paramsConfigurator.paramsAsDto(any())).thenReturn(sendConfiguratorFromOne);
//when
consumer.sendOutputLine("message");
//then
verify(sendConfiguratorFromOne).sendAndSkipResult();
}
}

View File

@ -1,40 +0,0 @@
/*******************************************************************************
* Copyright (c) 2012-2017 Codenvy, S.A.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Codenvy, S.A. - initial API and implementation
*******************************************************************************/
package org.eclipse.che.api.project.server.importer;
import org.everrest.websockets.message.ChannelBroadcastMessage;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
/**
* @author Igor Vinokur
*/
public class ProjectImportOutputWSLineConsumerTest {
@Test
public void shouldSendMessage() {
//given
ArgumentCaptor<ChannelBroadcastMessage> argumentCaptor = ArgumentCaptor.forClass(ChannelBroadcastMessage.class);
ProjectImportOutputWSLineConsumer consumer = spy(new ProjectImportOutputWSLineConsumer("project", "workspace", 300));
//when
consumer.sendMessage("message");
//then
verify(consumer).sendMessageToWS(argumentCaptor.capture());
assertEquals(argumentCaptor.getValue().getChannel(), "importProject:output");
assertEquals(argumentCaptor.getValue().getBody(), "{\"num\":1,\"line\":\"message\",\"project\":\"project\"}");
}
}