parent
43bc566253
commit
ac932ba2cf
|
|
@ -28,20 +28,19 @@ import org.apache.dolphinscheduler.remote.command.Command;
|
|||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
|
||||
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
|
||||
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants;
|
||||
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
|
||||
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.rmi.RemoteException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
|
@ -56,18 +55,20 @@ public class NettyRemotingClient {
|
|||
|
||||
private final NettyEncoder encoder = new NettyEncoder();
|
||||
|
||||
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap();
|
||||
|
||||
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
|
||||
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(128);
|
||||
|
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false);
|
||||
|
||||
private final NioEventLoopGroup workerGroup;
|
||||
|
||||
private final NettyClientHandler clientHandler = new NettyClientHandler(this);
|
||||
|
||||
private final NettyClientConfig clientConfig;
|
||||
|
||||
private final Semaphore asyncSemaphore = new Semaphore(200, true);
|
||||
|
||||
private final ExecutorService callbackExecutor;
|
||||
|
||||
private final NettyClientHandler clientHandler;
|
||||
|
||||
public NettyRemotingClient(final NettyClientConfig clientConfig){
|
||||
this.clientConfig = clientConfig;
|
||||
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
|
||||
|
|
@ -78,6 +79,10 @@ public class NettyRemotingClient {
|
|||
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
|
||||
}
|
||||
});
|
||||
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
|
||||
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), new CallerThreadExecutePolicy());
|
||||
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
|
||||
|
||||
this.start();
|
||||
}
|
||||
|
||||
|
|
@ -103,65 +108,79 @@ public class NettyRemotingClient {
|
|||
isStarted.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
//TODO
|
||||
public void send(final Address address, final Command command, final InvokeCallback invokeCallback) throws RemotingException {
|
||||
public void sendAsync(final Address address, final Command command, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
|
||||
final Channel channel = getChannel(address);
|
||||
if (channel == null) {
|
||||
throw new RemotingException("network error");
|
||||
}
|
||||
try {
|
||||
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
|
||||
final long opaque = command.getOpaque();
|
||||
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
if(acquired){
|
||||
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
|
||||
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, releaseSemaphore);
|
||||
try {
|
||||
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if(future.isSuccess()){
|
||||
logger.info("sent command {} to {}", command, address);
|
||||
} else{
|
||||
logger.error("send command {} to {} failed, error {}", command, address, future.cause());
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if(future.isSuccess()){
|
||||
responseFuture.setSendOk(true);
|
||||
return;
|
||||
} else {
|
||||
responseFuture.setSendOk(false);
|
||||
}
|
||||
responseFuture.setCause(future.cause());
|
||||
responseFuture.putResponse(null);
|
||||
try {
|
||||
responseFuture.executeInvokeCallback();
|
||||
} catch (Throwable ex){
|
||||
logger.error("execute callback error", ex);
|
||||
} finally{
|
||||
responseFuture.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
String msg = String.format("send command %s to address %s encounter error", command, address);
|
||||
throw new RemotingException(msg, ex);
|
||||
});
|
||||
} catch (Throwable ex){
|
||||
responseFuture.release();
|
||||
throw new RemotingException(String.format("send command to address: %s failed", address), ex);
|
||||
}
|
||||
} else{
|
||||
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
|
||||
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
|
||||
throw new RemotingTooMuchRequestException(message);
|
||||
}
|
||||
}
|
||||
|
||||
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
|
||||
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
|
||||
final Channel channel = getChannel(address);
|
||||
if (channel == null) {
|
||||
throw new RemotingException(String.format("connect to : %s fail", address));
|
||||
}
|
||||
final long opaque = command.getOpaque();
|
||||
try {
|
||||
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null);
|
||||
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
if(channelFuture.isSuccess()){
|
||||
responseFuture.setSendOk(true);
|
||||
return;
|
||||
} else{
|
||||
responseFuture.setSendOk(false);
|
||||
responseFuture.setCause(channelFuture.cause());
|
||||
responseFuture.putResponse(null);
|
||||
logger.error("send command {} to address {} failed", command, address);
|
||||
}
|
||||
}
|
||||
});
|
||||
Command result = responseFuture.waitResponse();
|
||||
if(result == null){
|
||||
if(responseFuture.isSendOK()){
|
||||
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
|
||||
} else{
|
||||
throw new RemoteException(address.toString(), responseFuture.getCause());
|
||||
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
|
||||
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if(future.isSuccess()){
|
||||
responseFuture.setSendOk(true);
|
||||
return;
|
||||
} else {
|
||||
responseFuture.setSendOk(false);
|
||||
}
|
||||
responseFuture.setCause(future.cause());
|
||||
responseFuture.putResponse(null);
|
||||
logger.error("send command {} to address {} failed", command, address);
|
||||
}
|
||||
});
|
||||
Command result = responseFuture.waitResponse();
|
||||
if(result == null){
|
||||
if(responseFuture.isSendOK()){
|
||||
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
|
||||
} else{
|
||||
throw new RemotingException(address.toString(), responseFuture.getCause());
|
||||
}
|
||||
return result;
|
||||
} catch (Exception ex) {
|
||||
String msg = String.format("send command %s to address %s error", command, address);
|
||||
throw new RemotingException(msg, ex);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public Channel getChannel(Address address) {
|
||||
|
|
@ -192,10 +211,6 @@ public class NettyRemotingClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
public ExecutorService getDefaultExecutor() {
|
||||
return defaultExecutor;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if(isStarted.compareAndSet(true, false)){
|
||||
try {
|
||||
|
|
@ -203,8 +218,8 @@ public class NettyRemotingClient {
|
|||
if(workerGroup != null){
|
||||
this.workerGroup.shutdownGracefully();
|
||||
}
|
||||
if(defaultExecutor != null){
|
||||
defaultExecutor.shutdown();
|
||||
if(callbackExecutor != null){
|
||||
this.callbackExecutor.shutdownNow();
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
logger.error("netty client close exception", ex);
|
||||
|
|
|
|||
|
|
@ -1,8 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.remote.exceptions;
|
||||
|
||||
/**
|
||||
* @Author: Tboy
|
||||
*/
|
||||
|
||||
public class RemotingTimeoutException extends RemotingException{
|
||||
|
||||
public RemotingTimeoutException(String message) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.remote.exceptions;
|
||||
|
||||
public class RemotingTooMuchRequestException extends RemotingException{
|
||||
|
||||
public RemotingTooMuchRequestException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.remote.future;
|
||||
|
||||
/**
|
||||
* @Author: Tboy
|
||||
* invoke callback
|
||||
*/
|
||||
public interface InvokeCallback {
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.remote.future;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* release semaphore
|
||||
*/
|
||||
public class ReleaseSemaphore {
|
||||
|
||||
private final Semaphore semaphore;
|
||||
|
||||
private final AtomicBoolean released;
|
||||
|
||||
public ReleaseSemaphore(Semaphore semaphore){
|
||||
this.semaphore = semaphore;
|
||||
this.released = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
public void release(){
|
||||
if(this.released.compareAndSet(false, true)){
|
||||
this.semaphore.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,13 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.remote.future;
|
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* @Author: Tboy
|
||||
* response future
|
||||
*/
|
||||
public class ResponseFuture {
|
||||
|
||||
|
|
@ -19,6 +34,8 @@ public class ResponseFuture {
|
|||
|
||||
private final InvokeCallback invokeCallback;
|
||||
|
||||
private final ReleaseSemaphore releaseSemaphore;
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private final long beginTimestamp = System.currentTimeMillis();
|
||||
|
|
@ -29,11 +46,11 @@ public class ResponseFuture {
|
|||
|
||||
private volatile Throwable cause;
|
||||
|
||||
|
||||
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback) {
|
||||
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback, ReleaseSemaphore releaseSemaphore) {
|
||||
this.opaque = opaque;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.invokeCallback = invokeCallback;
|
||||
this.releaseSemaphore = releaseSemaphore;
|
||||
FUTURE_TABLE.put(opaque, this);
|
||||
}
|
||||
|
||||
|
|
@ -95,7 +112,17 @@ public class ResponseFuture {
|
|||
return responseCommand;
|
||||
}
|
||||
|
||||
public void setResponseCommand(Command responseCommand) {
|
||||
this.responseCommand = responseCommand;
|
||||
}
|
||||
|
||||
public InvokeCallback getInvokeCallback() {
|
||||
return invokeCallback;
|
||||
}
|
||||
|
||||
public void release() {
|
||||
if(this.releaseSemaphore != null){
|
||||
this.releaseSemaphore.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* netty client request handler
|
||||
*/
|
||||
|
|
@ -34,8 +36,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
private final NettyRemotingClient nettyRemotingClient;
|
||||
|
||||
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
|
||||
private final ExecutorService callbackExecutor;
|
||||
|
||||
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
|
||||
this.nettyRemotingClient = nettyRemotingClient;
|
||||
this.callbackExecutor = callbackExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -52,8 +57,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
|||
private void processReceived(final Command responseCommand) {
|
||||
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
|
||||
if(future != null){
|
||||
future.putResponse(responseCommand);
|
||||
future.executeInvokeCallback();
|
||||
future.setResponseCommand(responseCommand);
|
||||
future.release();
|
||||
if(future.getInvokeCallback() != null){
|
||||
this.callbackExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
future.executeInvokeCallback();
|
||||
}
|
||||
});
|
||||
} else{
|
||||
future.putResponse(responseCommand);
|
||||
}
|
||||
} else{
|
||||
logger.warn("receive response {}, but not matched any request ", responseCommand);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.remote.utils;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* caller thread execute
|
||||
*/
|
||||
public class CallerThreadExecutePolicy implements RejectedExecutionHandler {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(CallerThreadExecutePolicy.class);
|
||||
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
logger.warn("queue is full, trigger caller thread execute");
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.remote.utils;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class NamedThreadFactory implements ThreadFactory {
|
||||
|
||||
private final AtomicInteger increment = new AtomicInteger(1);
|
||||
|
||||
private final String name;
|
||||
|
||||
private final int count;
|
||||
|
||||
public NamedThreadFactory(String name){
|
||||
this(name, 0);
|
||||
}
|
||||
|
||||
public NamedThreadFactory(String name, int count){
|
||||
this.name = name;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
|
||||
: String.format(name + "_%d", increment.getAndIncrement());
|
||||
Thread t = new Thread(r, threadName);
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
|
@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.command.Ping;
|
|||
import org.apache.dolphinscheduler.remote.command.Pong;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.junit.Assert;
|
||||
|
|
@ -36,7 +38,33 @@ public class NettyRemotingClientTest {
|
|||
|
||||
|
||||
@Test
|
||||
public void testSend(){
|
||||
public void testSendSync(){
|
||||
NettyServerConfig serverConfig = new NettyServerConfig();
|
||||
|
||||
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
|
||||
server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
|
||||
@Override
|
||||
public void process(Channel channel, Command command) {
|
||||
channel.writeAndFlush(Pong.create(command.getOpaque()));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
server.start();
|
||||
//
|
||||
final NettyClientConfig clientConfig = new NettyClientConfig();
|
||||
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
|
||||
Command commandPing = Ping.create();
|
||||
try {
|
||||
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
|
||||
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAsync(){
|
||||
NettyServerConfig serverConfig = new NettyServerConfig();
|
||||
|
||||
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
|
||||
|
|
@ -50,10 +78,19 @@ public class NettyRemotingClientTest {
|
|||
//
|
||||
final NettyClientConfig clientConfig = new NettyClientConfig();
|
||||
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Command commandPing = Ping.create();
|
||||
try {
|
||||
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
|
||||
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
|
||||
final AtomicLong opaque = new AtomicLong(0);
|
||||
client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
|
||||
@Override
|
||||
public void operationComplete(ResponseFuture responseFuture) {
|
||||
opaque.set(responseFuture.getOpaque());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
latch.await();
|
||||
Assert.assertEquals(commandPing.getOpaque(), opaque.get());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue