add sendSync method
parent
8e154ecbe4
commit
820b84bb46
|
|
@ -25,17 +25,19 @@ import io.netty.channel.socket.nio.NioSocketChannel;
|
|||
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
|
||||
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
|
||||
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.rmi.RemoteException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
|
@ -101,15 +103,8 @@ public class NettyRemotingClient {
|
|||
isStarted.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
|
||||
registerProcessor(commandType, processor, null);
|
||||
}
|
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
|
||||
this.clientHandler.registerProcessor(commandType, processor, executor);
|
||||
}
|
||||
|
||||
public void send(final Address address, final Command command) throws RemotingException {
|
||||
//TODO
|
||||
public void send(final Address address, final Command command, final InvokeCallback invokeCallback) throws RemotingException {
|
||||
final Channel channel = getChannel(address);
|
||||
if (channel == null) {
|
||||
throw new RemotingException("network error");
|
||||
|
|
@ -132,17 +127,39 @@ public class NettyRemotingClient {
|
|||
}
|
||||
}
|
||||
|
||||
//TODO
|
||||
public void sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
|
||||
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws RemotingException {
|
||||
final Channel channel = getChannel(address);
|
||||
if (channel == null) {
|
||||
throw new RemotingException("network error");
|
||||
throw new RemotingException(String.format("connect to : %s fail", address));
|
||||
}
|
||||
final long opaque = command.getOpaque();
|
||||
try {
|
||||
|
||||
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null);
|
||||
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
if(channelFuture.isSuccess()){
|
||||
responseFuture.setSendOk(true);
|
||||
return;
|
||||
} else{
|
||||
responseFuture.setSendOk(false);
|
||||
responseFuture.setCause(channelFuture.cause());
|
||||
responseFuture.putResponse(null);
|
||||
logger.error("send command {} to address {} failed", command, address);
|
||||
}
|
||||
}
|
||||
});
|
||||
Command result = responseFuture.waitResponse();
|
||||
if(result == null){
|
||||
if(responseFuture.isSendOK()){
|
||||
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
|
||||
} else{
|
||||
throw new RemoteException(address.toString(), responseFuture.getCause());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} catch (Exception ex) {
|
||||
String msg = String.format("send command %s to address %s encounter error", command, address);
|
||||
String msg = String.format("send command %s to address %s error", command, address);
|
||||
throw new RemotingException(msg, ex);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,10 @@
|
|||
*/
|
||||
package org.apache.dolphinscheduler.remote.command;
|
||||
|
||||
import com.sun.org.apache.regexp.internal.RE;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* receive task log request command and content fill
|
||||
|
|
@ -24,11 +27,12 @@ import java.io.Serializable;
|
|||
*/
|
||||
public class Command implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final AtomicLong REQUEST_ID = new AtomicLong(1);
|
||||
|
||||
public static final byte MAGIC = (byte) 0xbabe;
|
||||
|
||||
public Command(){
|
||||
this.opaque = REQUEST_ID.getAndIncrement();
|
||||
}
|
||||
|
||||
public Command(long opaque){
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private String applicationName;
private String groupName;
private String taskName;
private int connectorPort;
private String description;
private String className;
private String methodName;
private String params;
private List<Integer> shardItems;
public List<Integer> getShardItems() {
return shardItems;
}
public void setShardItems(List<Integer> shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskRequestCommand implements Serializable {
private String taskId;
private String attemptId;
private String applicationName;
private String groupName;
private String taskName;
private int connectorPort;
private String description;
private String className;
private String methodName;
private String params;
private List<Integer> shardItems;
public List<Integer> getShardItems() {
return shardItems;
}
public void setShardItems(List<Integer> shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
|
||||
|
|
@ -1 +1 @@
|
|||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskResponseCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskResponseCommand implements Serializable {
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(long opaque){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
|
||||
|
|
@ -26,8 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
public class Ping implements Serializable {
|
||||
|
||||
private static final AtomicLong ID = new AtomicLong(1);
|
||||
|
||||
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
|
||||
|
||||
private static byte[] EMPTY_BODY_ARRAY = new byte[0];
|
||||
|
|
@ -49,7 +47,7 @@ public class Ping implements Serializable {
|
|||
}
|
||||
|
||||
public static Command create(){
|
||||
Command command = new Command(ID.getAndIncrement());
|
||||
Command command = new Command();
|
||||
command.setType(CommandType.PING);
|
||||
command.setBody(EMPTY_BODY_ARRAY);
|
||||
return command;
|
||||
|
|
|
|||
|
|
@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
*/
|
||||
public class GetLogBytesRequestCommand implements Serializable {
|
||||
|
||||
private static final AtomicLong REQUEST = new AtomicLong(1);
|
||||
|
||||
private String path;
|
||||
|
||||
public GetLogBytesRequestCommand() {
|
||||
|
|
@ -53,7 +51,7 @@ public class GetLogBytesRequestCommand implements Serializable {
|
|||
* @return
|
||||
*/
|
||||
public Command convert2Command(){
|
||||
Command command = new Command(REQUEST.getAndIncrement());
|
||||
Command command = new Command();
|
||||
command.setType(CommandType.GET_LOG_BYTES_REQUEST);
|
||||
byte[] body = FastJsonSerializer.serialize(this);
|
||||
command.setBody(body);
|
||||
|
|
|
|||
|
|
@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
*/
|
||||
public class RollViewLogRequestCommand implements Serializable {
|
||||
|
||||
private static final AtomicLong REQUEST = new AtomicLong(1);
|
||||
|
||||
private String path;
|
||||
|
||||
private int skipLineNum;
|
||||
|
|
@ -71,7 +69,7 @@ public class RollViewLogRequestCommand implements Serializable {
|
|||
}
|
||||
|
||||
public Command convert2Command(){
|
||||
Command command = new Command(REQUEST.getAndIncrement());
|
||||
Command command = new Command();
|
||||
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
|
||||
byte[] body = FastJsonSerializer.serialize(this);
|
||||
command.setBody(body);
|
||||
|
|
|
|||
|
|
@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
*/
|
||||
public class ViewLogRequestCommand implements Serializable {
|
||||
|
||||
private static final AtomicLong REQUEST = new AtomicLong(1);
|
||||
|
||||
private String path;
|
||||
|
||||
public ViewLogRequestCommand() {
|
||||
|
|
@ -49,7 +47,7 @@ public class ViewLogRequestCommand implements Serializable {
|
|||
}
|
||||
|
||||
public Command convert2Command(){
|
||||
Command command = new Command(REQUEST.getAndIncrement());
|
||||
Command command = new Command();
|
||||
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
|
||||
byte[] body = FastJsonSerializer.serialize(this);
|
||||
command.setBody(body);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
package org.apache.dolphinscheduler.remote.exceptions;
|
||||
|
||||
/**
|
||||
* @Author: Tboy
|
||||
*/
|
||||
public class RemotingTimeoutException extends RemotingException{
|
||||
|
||||
public RemotingTimeoutException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
|
||||
public RemotingTimeoutException(String address, long timeoutMillis) {
|
||||
this(address, timeoutMillis, null);
|
||||
}
|
||||
|
||||
public RemotingTimeoutException(String address, long timeoutMillis, Throwable cause) {
|
||||
super(String.format("wait response on the channel %s timeout %s", address, timeoutMillis), cause);
|
||||
}
|
||||
}
|
||||
|
|
@ -2,28 +2,100 @@ package org.apache.dolphinscheduler.remote.future;
|
|||
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @Author: Tboy
|
||||
*/
|
||||
public class ResponseFuture {
|
||||
|
||||
private final int opaque;
|
||||
private final static ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
|
||||
|
||||
private final long opaque;
|
||||
|
||||
private final long timeoutMillis;
|
||||
|
||||
private final InvokeCallback invokeCallback;
|
||||
|
||||
private final long beginTimestamp = System.currentTimeMillis();
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback) {
|
||||
private final long beginTimestamp = System.currentTimeMillis();
|
||||
|
||||
private volatile Command responseCommand;
|
||||
|
||||
private volatile boolean sendOk = true;
|
||||
|
||||
private volatile Throwable cause;
|
||||
|
||||
|
||||
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback) {
|
||||
this.opaque = opaque;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.invokeCallback = invokeCallback;
|
||||
FUTURE_TABLE.put(opaque, this);
|
||||
}
|
||||
|
||||
public Command waitResponse() throws InterruptedException {
|
||||
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
return this.responseCommand;
|
||||
}
|
||||
|
||||
public void putResponse(final Command responseCommand) {
|
||||
this.responseCommand = responseCommand;
|
||||
this.latch.countDown();
|
||||
FUTURE_TABLE.remove(opaque);
|
||||
}
|
||||
|
||||
public static ResponseFuture getFuture(long opaque){
|
||||
return FUTURE_TABLE.get(opaque);
|
||||
}
|
||||
|
||||
public boolean isTimeout() {
|
||||
long diff = System.currentTimeMillis() - this.beginTimestamp;
|
||||
return diff > this.timeoutMillis;
|
||||
}
|
||||
|
||||
public void executeInvokeCallback() {
|
||||
if (invokeCallback != null) {
|
||||
invokeCallback.operationComplete(this);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSendOK() {
|
||||
return sendOk;
|
||||
}
|
||||
|
||||
public void setSendOk(boolean sendOk) {
|
||||
this.sendOk = sendOk;
|
||||
}
|
||||
|
||||
public void setCause(Throwable cause) {
|
||||
this.cause = cause;
|
||||
}
|
||||
|
||||
public Throwable getCause() {
|
||||
return cause;
|
||||
}
|
||||
|
||||
public long getOpaque() {
|
||||
return opaque;
|
||||
}
|
||||
|
||||
public long getTimeoutMillis() {
|
||||
return timeoutMillis;
|
||||
}
|
||||
|
||||
public long getBeginTimestamp() {
|
||||
return beginTimestamp;
|
||||
}
|
||||
|
||||
public Command getResponseCommand() {
|
||||
return responseCommand;
|
||||
}
|
||||
|
||||
public InvokeCallback getInvokeCallback() {
|
||||
return invokeCallback;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,17 +19,11 @@ package org.apache.dolphinscheduler.remote.handler;
|
|||
import io.netty.channel.*;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
|
||||
import org.apache.dolphinscheduler.remote.utils.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
/**
|
||||
* netty client request handler
|
||||
*/
|
||||
|
|
@ -40,8 +34,6 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
private final NettyRemotingClient nettyRemotingClient;
|
||||
|
||||
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap();
|
||||
|
||||
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
|
||||
this.nettyRemotingClient = nettyRemotingClient;
|
||||
}
|
||||
|
|
@ -54,42 +46,16 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
|||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
processReceived(ctx.channel(), (Command)msg);
|
||||
processReceived((Command)msg);
|
||||
}
|
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
|
||||
this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
|
||||
}
|
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
|
||||
ExecutorService executorRef = executor;
|
||||
if(executorRef == null){
|
||||
executorRef = nettyRemotingClient.getDefaultExecutor();
|
||||
}
|
||||
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
|
||||
}
|
||||
|
||||
private void processReceived(final Channel channel, final Command msg) {
|
||||
final CommandType commandType = msg.getType();
|
||||
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType);
|
||||
if (pair != null) {
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
pair.getLeft().process(channel, msg);
|
||||
} catch (Throwable ex) {
|
||||
logger.error("process msg {} error : {}", msg, ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
try {
|
||||
pair.getRight().submit(r);
|
||||
} catch (RejectedExecutionException e) {
|
||||
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
|
||||
}
|
||||
} else {
|
||||
logger.warn("commandType {} not support", commandType);
|
||||
private void processReceived(final Command responseCommand) {
|
||||
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
|
||||
if(future != null){
|
||||
future.putResponse(responseCommand);
|
||||
future.executeInvokeCallback();
|
||||
} else{
|
||||
logger.warn("receive response {}, but not matched any request ", responseCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -48,24 +48,14 @@ public class NettyRemotingClientTest {
|
|||
});
|
||||
server.start();
|
||||
//
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicLong opaque = new AtomicLong(1);
|
||||
final NettyClientConfig clientConfig = new NettyClientConfig();
|
||||
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
|
||||
client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
|
||||
@Override
|
||||
public void process(Channel channel, Command command) {
|
||||
opaque.set(command.getOpaque());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
Command commandPing = Ping.create();
|
||||
try {
|
||||
client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
|
||||
latch.await();
|
||||
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
|
||||
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Assert.assertEquals(opaque.get(), commandPing.getOpaque());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,13 +16,10 @@
|
|||
*/
|
||||
package org.apache.dolphinscheduler.service.log;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.command.log.*;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -32,7 +29,7 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* log client
|
||||
*/
|
||||
public class LogClientService implements NettyRequestProcessor {
|
||||
public class LogClientService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
|
||||
|
||||
|
|
@ -52,9 +49,6 @@ public class LogClientService implements NettyRequestProcessor {
|
|||
this.clientConfig = new NettyClientConfig();
|
||||
this.clientConfig.setWorkerThreads(4);
|
||||
this.client = new NettyRemotingClient(clientConfig);
|
||||
this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
|
||||
this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
|
||||
this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -81,9 +75,12 @@ public class LogClientService implements NettyRequestProcessor {
|
|||
final Address address = new Address(host, port);
|
||||
try {
|
||||
Command command = request.convert2Command();
|
||||
this.client.send(address, command);
|
||||
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
|
||||
result = ((String)promise.getResult());
|
||||
Command response = this.client.sendSync(address, command, logRequestTimeout);
|
||||
if(response != null){
|
||||
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
|
||||
command.getBody(), RollViewLogResponseCommand.class);
|
||||
return rollReviewLog.getMsg();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("roll view log error", e);
|
||||
} finally {
|
||||
|
|
@ -106,9 +103,12 @@ public class LogClientService implements NettyRequestProcessor {
|
|||
final Address address = new Address(host, port);
|
||||
try {
|
||||
Command command = request.convert2Command();
|
||||
this.client.send(address, command);
|
||||
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
|
||||
result = ((String)promise.getResult());
|
||||
Command response = this.client.sendSync(address, command, logRequestTimeout);
|
||||
if(response != null){
|
||||
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
|
||||
response.getBody(), ViewLogResponseCommand.class);
|
||||
return viewLog.getMsg();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("view log error", e);
|
||||
} finally {
|
||||
|
|
@ -131,9 +131,12 @@ public class LogClientService implements NettyRequestProcessor {
|
|||
final Address address = new Address(host, port);
|
||||
try {
|
||||
Command command = request.convert2Command();
|
||||
this.client.send(address, command);
|
||||
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
|
||||
result = (byte[])promise.getResult();
|
||||
Command response = this.client.sendSync(address, command, logRequestTimeout);
|
||||
if(response != null){
|
||||
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
|
||||
response.getBody(), GetLogBytesResponseCommand.class);
|
||||
return getLog.getData();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("get log size error", e);
|
||||
} finally {
|
||||
|
|
@ -141,28 +144,4 @@ public class LogClientService implements NettyRequestProcessor {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Channel channel, Command command) {
|
||||
logger.info("received log response : {}", command);
|
||||
switch (command.getType()){
|
||||
case ROLL_VIEW_LOG_RESPONSE:
|
||||
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
|
||||
command.getBody(), RollViewLogResponseCommand.class);
|
||||
LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
|
||||
break;
|
||||
case VIEW_WHOLE_LOG_RESPONSE:
|
||||
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
|
||||
command.getBody(), ViewLogResponseCommand.class);
|
||||
LogPromise.notify(command.getOpaque(), viewLog.getMsg());
|
||||
break;
|
||||
case GET_LOG_BYTES_RESPONSE:
|
||||
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
|
||||
command.getBody(), GetLogBytesResponseCommand.class);
|
||||
LogPromise.notify(command.getOpaque(), getLog.getData());
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue