에이전트-콘솔 메시지 송수신 로직 개선

main
semin.baek 5 months ago
parent 10075649a1
commit 22477dc9bd

@ -27,6 +27,8 @@ public class AckDto {
RECEIVE_SUCCESS,
RECEIVE_FAIL,
PROCESS_SUCCESS,
PROCESS_FAIL
PROCESS_FAIL,
TRANSFER_SUCCESS,
TRANSFER_FAIL
}
}

@ -28,6 +28,7 @@ public class ReceiveMessageDto implements Serializable {
private ProcessStatus processStatus;
public static enum ProcessStatus {
PROCESS_SEND,
PROCESS_RECEIVED,
PROCESS_DONE,
PROCESS_NOT_POSSIBLE,

@ -48,10 +48,6 @@ public class ListenerService {
AckDto ackDto = null;
try {
ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString);
log.debug("[todayDebug] receiveMessageDto.getRecipientHostId(): {}", receiveMessageDto.getRecipientHostId());
log.debug("[todayDebug] receiveMessageDto.getRoutingHostList(): {}", receiveMessageDto.getRoutingHostList());
log.debug("[todayDebug] receiveMessageDto.getRoutingHostList().get(0).getClass().getName(): {}", receiveMessageDto.getRoutingHostList().get(0).getClass().getName());
log.debug("[todayDebug] receiveMessageDto.getRoutingHostList().size() - 1: {}", receiveMessageDto.getRoutingHostList().size() - 1);
String lastRoutingHostId = receiveMessageDto.getRoutingHostList().get(receiveMessageDto.getRoutingHostList().size() - 1).getHostId();
if(receiveMessageDto.getRecipientHostId().equals(this.agentConfigReader.getAgentConfigDto().getMyHostId()) || lastRoutingHostId.equals(this.agentConfigReader.getAgentConfigDto().getMyHostId())) {
if(dropBoxService.isExistToday(receiveMessageDto)) {
@ -63,8 +59,8 @@ public class ListenerService {
}
}
else {
MessageUtils.transfer(this.agentConfigReader, receiveMessageDto);
ackDto = AckDto.builder().result(AckDto.ResultType.RECEIVE_SUCCESS).build();
AckDto transfarAckDto = MessageUtils.transfer(this.agentConfigReader, receiveMessageDto);
ackDto = AckDto.builder().result(transfarAckDto.getResult()).build();
}
} catch (IllegalMessageException | NullMessageException | InCompleteMessageException | JsonProcessingException e) {
log.error("{}", e, e);

@ -61,6 +61,16 @@ public class MessageUtils {
throw new InCompleteMessageException("senderTimestamp 값의 형식이 숫자형식이 아닙니다. " + senderTimeStampString);
}
}
String senderPostmanId;
if(map.get("senderPostmanId") == null) {
throw new InCompleteMessageException("senderPostmanId 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(map.get("senderPostmanId")))) {
throw new InCompleteMessageException("senderPostmanId 값을 찾을 수 없습니다.");
}
else {
senderPostmanId = String.valueOf(map.get("senderPostmanId"));
}
String messageUuid;
if(map.get("messageUuid") == null) {
throw new InCompleteMessageException("messageUuid 엘리먼트를 찾을 수 없습니다.");
@ -131,8 +141,6 @@ public class MessageUtils {
throw new InCompleteMessageException("routingHostList 엘리먼트를 찾을 수 없습니다.");
}
else {
log.debug("[todayDebug] map.get(\"routingHostList\").getClass().getName(): {}", map.get("routingHostList").getClass().getName());
log.debug("[todayDebug] ((List) map.get(\"routingHostList\")).get(0).getClass().getName(): {}", ((List) map.get("routingHostList")).get(0).getClass().getName());
List<Map<String, Object>> mapList = (List<Map<String, Object>>) map.get("routingHostList");
routingHostList = new ArrayList<>();
for(Map<String, Object> map1 : mapList) {
@ -150,12 +158,22 @@ public class MessageUtils {
else {
dataString = String.valueOf(map.get("data"));
}
ReceiveMessageDto.ProcessStatus processStatus;
if(map.get("processStatus") == null) {
throw new InCompleteMessageException("processStatus 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(map.get("processStatus")))) {
throw new InCompleteMessageException("processStatus 값을 찾을 수 없습니다.");
}
else {
processStatus = EnumUtils.getEnum(ReceiveMessageDto.ProcessStatus.class, String.valueOf(map.get("processStatus")));
}
receiveMessageDto = ReceiveMessageDto.builder()
.senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.senderHostId(senderHostId).senderTimestamp(senderTimestamp).senderPostmanId(senderPostmanId)
.messageUuid(messageUuid).messageType(messageType).receivedTimestamp(receivedTimestamp)
.recipientHostId(recipientHostId).recipientDropBoxId(recipientDropBoxId)
.routingHostList(routingHostList)
.data(dataString).attachFileList(new ArrayList<>()).processStatus(ReceiveMessageDto.ProcessStatus.PROCESS_RECEIVED)
.data(dataString).attachFileList(new ArrayList<>()).processStatus(processStatus)
.build();
}
catch(JsonProcessingException e) {

@ -95,22 +95,25 @@ public class PostmanSchedulerService {
.senderPostmanId(postman.getPostmanId()).messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB)
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
.routingHostList(routingHostList)
.data(dataString).build();
.data(dataString).processStatus(ReceiveMessageDto.ProcessStatus.PROCESS_SEND).build();
log.debug("postman to {} send a message UUID {} (data count: {})", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid(), dataMapList.size());
AckDto ackDto = MessageUtils.send(this.agentConfigReader, postman.getPostmanId(), receiveMessageDto);
log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto);
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
log.warn("대상 agent {}[{}]에게 전송하였으나 상대방이 수신하지 못하였습니다. 응답: {} 응답메시지: {}"
, knownAgent.getHostId(), knownAgent.getHostName(), ackDto.getResult(), ackDto.getResultText());
}
else {
if(ackDto.getResult() == AckDto.ResultType.RECEIVE_SUCCESS) {
receiveMessageDto.setProcessStatus(ReceiveMessageDto.ProcessStatus.PROCESS_RECEIVED);
receiveMessageDto.setReceivedTimestamp(System.currentTimeMillis());
String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId();
sqlExecuteService.update(dataSourceId, postProcessingSqlId, new HashMap<String, Object>());
}
if(agentConfigReader.isConnectedConsole()) {
ReceiveMessageDto.ProcessStatus processStatus = AckDto.ResultType.RECEIVE_SUCCESS == ackDto.getResult() ? ReceiveMessageDto.ProcessStatus.PROCESS_RECEIVED : ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
receiveMessageDto.setProcessStatus(processStatus);
else if(ackDto.getResult() == AckDto.ResultType.TRANSFER_SUCCESS) {
receiveMessageDto.setReceivedTimestamp(System.currentTimeMillis());
receiveMessageDto.setProcessStatus(ReceiveMessageDto.ProcessStatus.PROCESS_SEND);
}
else if(ackDto.getResult() == AckDto.ResultType.RECEIVE_FAIL || ackDto.getResult() == AckDto.ResultType.TRANSFER_FAIL) {
log.warn("대상 agent {}[{}]에게 전송하였으나 상대방이 수신하지 못하였습니다. 응답: {} 응답메시지: {}", knownAgent.getHostId(), knownAgent.getHostName(), ackDto.getResult(), ackDto.getResultText());
receiveMessageDto.setProcessStatus(ReceiveMessageDto.ProcessStatus.PROCESS_FAIL);
}
if(agentConfigReader.isConnectedConsole()) {
MessageUtils.announceMessageHistory(this.agentConfigReader, receiveMessageDto);
}
}

Loading…
Cancel
Save