Listener 메시지 수신 작업 완료 (메시지 처리는 이제 해야 함)

main
semin.baek 11 months ago
parent 4b91f5fe1a
commit 6d3c29787c

@ -7,11 +7,13 @@ CREATE TABLE TB_HOST (
CREATE TABLE TB_RECEIVE_MESSAGE ( CREATE TABLE TB_RECEIVE_MESSAGE (
SENDER_HOST_ID VARCHAR(512) NOT NULL SENDER_HOST_ID VARCHAR(512) NOT NULL
, SENDER_TIMESTAMP TIMESTAMP NOT NULL
, MESSAGE_UUID VARCHAR(36) NOT NULL , MESSAGE_UUID VARCHAR(36) NOT NULL
, RECEIVED_TIMESTAMP TIMESTAMP NOT NULL
, MESSAGE_TYPE VARCHAR(128) NOT NULL , MESSAGE_TYPE VARCHAR(128) NOT NULL
, RECEIVED_TIMESTAMP TIMESTAMP NOT NULL
, RECIPIENT_HOST_ID VARCHAR(512) NOT NULL
, RECIPIENT_DROP_BOX VARCHAR(256) NOT NULL
, DATA CLOB , DATA CLOB
, PROCESS_STATUS VARCHAR(128) NOT NULL , PROCESS_STATUS VARCHAR(128) NOT NULL
, PRIMARY KEY (SENDER_HOST_ID, MESSAGE_UUID) , PRIMARY KEY (SENDER_HOST_ID, MESSAGE_UUID)
); );

@ -3,6 +3,8 @@
"host-id": "sam", "host-id": "sam",
"timestamp": 1740643945523 "timestamp": 1740643945523
}, },
"message-uuid": "9b1deb4d-3b7d-4bad-9bdd-2b0d7b3dcb6d",
"message-type": "SAVE_DB_DATA",
"recipient": { "recipient": {
"host-id": "defree", "host-id": "defree",
"drop-box-id": "save-violation-history" "drop-box-id": "save-violation-history"

@ -0,0 +1,6 @@
package com.bsmlab.dfx.agent.config.constant;
public enum MessageType {
SAVE_DB_DATA,
SAVE_FILE;
}

@ -0,0 +1,5 @@
package com.bsmlab.dfx.agent.config.constant;
public enum ProcessStatusType {
READ;
}

@ -8,6 +8,8 @@ import lombok.Setter;
@Setter @Setter
@Builder @Builder
public class AckDto { public class AckDto {
public static final String RESULT_SUCCESS = "SUCCESS";
public static final String RESULT_FAIL = "FAIL";
private String result; private String result;
private String message; private String message;
} }

@ -2,9 +2,8 @@ package com.bsmlab.dfx.agent.listener.dto;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import java.util.Map;
@Mapper @Mapper
public interface ListenerMapper { public interface ListenerMapper {
void insertReceiveMessage(Map<String, Object> param); int selectReceiveMessageCountByPk(ReceiveMessageDto receiveMessageDto);
void insertReceiveMessage(ReceiveMessageDto receiveMessageDto);
} }

@ -9,9 +9,12 @@ import lombok.*;
@NoArgsConstructor @NoArgsConstructor
public class ReceiveMessageDto { public class ReceiveMessageDto {
private String senderHostId; private String senderHostId;
private long senderTimestamp;
private String messageUuid; private String messageUuid;
private long receivedTimestamp;
private String messageType; private String messageType;
private long receivedTimestamp;
private String recipientHostId;
private String recipientDropBox;
private String data; private String data;
private String processStatus; private String processStatus;
} }

@ -3,16 +3,14 @@ package com.bsmlab.dfx.agent.listener.service;
import com.bsmlab.dfx.agent.listener.dto.AckDto; import com.bsmlab.dfx.agent.listener.dto.AckDto;
import com.bsmlab.dfx.agent.listener.dto.ListenerMapper; import com.bsmlab.dfx.agent.listener.dto.ListenerMapper;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import com.fasterxml.jackson.core.JsonProcessingException; import com.bsmlab.dfx.agent.support.MessageUtils;
import com.fasterxml.jackson.core.type.TypeReference; import com.bsmlab.dfx.agent.support.exception.IllegalMessageException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException;
import com.bsmlab.dfx.agent.support.exception.NullMessageException;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@Service @Service
@ -20,16 +18,21 @@ public class ListenerService {
private final ListenerMapper listenerMapper; private final ListenerMapper listenerMapper;
public AckDto saveNewMessage(String messageJsonString) { public AckDto saveNewMessage(String messageJsonString) {
return null; AckDto ackDto = null;
} try {
ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString);
private ReceiveMessageDto toReceiveMessageDto(String messageJsonString) throws JsonProcessingException { int counter = listenerMapper.selectReceiveMessageCountByPk(receiveMessageDto);
ObjectMapper objectMapper = new ObjectMapper(); if(counter > 0) {
Map<String, String> map = null; ackDto = AckDto.builder().result(AckDto.RESULT_FAIL).message("이전 전송한 메시지 중 중복된 UUID가 존재합니다.").build();
map = objectMapper.readValue(messageJsonString, new TypeReference<HashMap<String, String>>() {}); }
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder() else {
.senderHostId(map.get("")) listenerMapper.insertReceiveMessage(receiveMessageDto);
.build(); ackDto = AckDto.builder().result(AckDto.RESULT_SUCCESS).build();
return null; }
} catch (IllegalMessageException | NullMessageException | InCompleteMessageException e) {
log.error("{}", e, e);
ackDto = AckDto.builder().result(AckDto.RESULT_FAIL).message(e.getLocalizedMessage()).build();
}
return ackDto;
} }
} }

@ -0,0 +1,133 @@
package com.bsmlab.dfx.agent.support;
import com.bsmlab.dfx.agent.config.constant.MessageType;
import com.bsmlab.dfx.agent.config.constant.ProcessStatusType;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import com.bsmlab.dfx.agent.support.exception.IllegalMessageException;
import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException;
import com.bsmlab.dfx.agent.support.exception.NullMessageException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class MessageUtils {
private MessageUtils() {};
public static ReceiveMessageDto toReceiveMessageDto(String messageJsonString) throws IllegalMessageException, NullMessageException, InCompleteMessageException {
ReceiveMessageDto receiveMessageDto = null;
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = null;
try {
map = objectMapper.readValue(messageJsonString, new TypeReference<HashMap<String, Object>>() {});
if(map == null) {
throw new NullMessageException("");
}
if(map.get("sender") == null) {
throw new InCompleteMessageException("sender 엘리먼트를 찾을 수 없습니다.");
}
else if(map.get("sender") instanceof Map){
throw new InCompleteMessageException("sender 엘리먼트의 데이터가 객체타입이 아닙니다.");
}
Map<String, Object> senderMap = (Map<String, Object>)map.get("sender");
String senderHostId;
if(senderMap.get("host-id") == null) {
throw new InCompleteMessageException("sender.host-id 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(senderMap.get("host-id")))) {
throw new InCompleteMessageException("sender.host-id 값을 찾을 수 없습니다.");
}
else {
senderHostId = String.valueOf(senderMap.get("host-id"));
}
long senderTimestamp = 0;
if(senderMap.get("timestamp") == null) {
throw new InCompleteMessageException("sender.timestamp 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(senderMap.get("timestamp")))) {
throw new InCompleteMessageException("sender.timestamp 값을 찾을 수 없습니다.");
}
else {
String senderTimeStampString = String.valueOf(senderMap.get("host-id"));
try {
senderTimestamp = Long.parseLong(senderTimeStampString);
} catch (NumberFormatException e) {
throw new InCompleteMessageException("sender.timestamp 값의 형식이 숫자형식이 아닙니다. " + senderTimeStampString);
}
}
String messageUuid;
if(map.get("message-uuid") == null) {
throw new InCompleteMessageException("message-uuid 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(map.get("message-uuid")))) {
throw new InCompleteMessageException("message-uuid 값을 찾을 수 없습니다.");
}
else {
messageUuid = String.valueOf(map.get("message-uuid"));
try {
UUID.fromString(messageUuid);
}
catch (IllegalArgumentException e) {
throw new InCompleteMessageException("message-uuid 값의 형식이 숫자형식이 아닙니다. " + messageUuid);
}
}
long receivedTimestamp = System.currentTimeMillis();
if(map.get("recipient") == null) {
throw new InCompleteMessageException("recipient 엘리먼트를 찾을 수 없습니다.");
}
else if(map.get("recipient") instanceof Map){
throw new InCompleteMessageException("recipient 엘리먼트의 데이터가 객체타입이 아닙니다.");
}
Map<String, Object> recipientMap = (Map<String, Object>)map.get("recipient");
//TODO recipient
String recipientHostId;
if(recipientMap.get("host-id") == null) {
throw new InCompleteMessageException("recipient.host-id 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(recipientMap.get("host-id")))) {
throw new InCompleteMessageException("recipient.host-id 값을 찾을 수 없습니다.");
}
else {
recipientHostId = String.valueOf(recipientMap.get("host-id"));
}
String recipientDropBoxId;
if(recipientMap.get("drop-box-id") == null) {
throw new InCompleteMessageException("recipient.drop-box-id 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(recipientMap.get("drop-box-id")))) {
throw new InCompleteMessageException("recipient.drop-box-id 값을 찾을 수 없습니다.");
}
else {
recipientDropBoxId = String.valueOf(recipientMap.get("drop-box-id"));
}
String messageType;
if(map.get("message-type") == null) {
throw new InCompleteMessageException("message-type 엘리먼트를 찾을 수 없습니다.");
}
else if(StringUtils.isBlank(String.valueOf(map.get("message-type")))) {
throw new InCompleteMessageException("message-type 값을 찾을 수 없습니다.");
}
else {
messageType = String.valueOf(map.get("message-type"));
if(!EnumUtils.isValidEnum(MessageType.class, messageType)) {
throw new InCompleteMessageException("message-type 값이 옳바르지 않습니다. " + messageType);
}
}
receiveMessageDto = ReceiveMessageDto.builder()
.senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.messageUuid(messageUuid).messageType(messageType).receivedTimestamp(receivedTimestamp)
.recipientHostId(recipientHostId).recipientDropBox(recipientDropBoxId)
.data(messageJsonString).processStatus(ProcessStatusType.READ.name())
.build();
}
catch(JsonProcessingException e) {
throw new IllegalMessageException(e.getMessage());
}
return receiveMessageDto;
}
}

@ -0,0 +1,19 @@
package com.bsmlab.dfx.agent.support.exception;
public class DfxException extends Exception {
protected String additionalMessage;
public DfxException(String additionalMessage) {
this.additionalMessage = additionalMessage;
}
@Override
public String getMessage() {
return "cannot find message contents. " + this.additionalMessage;
}
@Override
public String getLocalizedMessage() {
return "파싱한 메시지의 내용을 찾을 수 없습니다. " + this.additionalMessage;
}
}

@ -0,0 +1,17 @@
package com.bsmlab.dfx.agent.support.exception;
public class IllegalMessageException extends DfxException {
public IllegalMessageException(String additionalMessage) {
super(additionalMessage);
}
@Override
public String getMessage() {
return "cannot parse json message." + this.additionalMessage;
}
@Override
public String getLocalizedMessage() {
return "json 메시지를 파싱할 수 없습니다." + this.additionalMessage;
}
}

@ -0,0 +1,17 @@
package com.bsmlab.dfx.agent.support.exception;
public class InCompleteMessageException extends DfxException {
public InCompleteMessageException(String additionalMessage) {
super(additionalMessage);
}
@Override
public String getMessage() {
return "incomplete message." + this.additionalMessage;
}
@Override
public String getLocalizedMessage() {
return "메시지가 불완전합니다." + this.additionalMessage;
}
}

@ -0,0 +1,17 @@
package com.bsmlab.dfx.agent.support.exception;
public class NullMessageException extends DfxException {
public NullMessageException(String addtitionalMessage) {
super(addtitionalMessage);
}
@Override
public String getMessage() {
return "cannot find message contents." + this.additionalMessage;
}
@Override
public String getLocalizedMessage() {
return "파싱한 메시지의 내용을 찾을 수 없습니다." + this.additionalMessage;
}
}

@ -0,0 +1,10 @@
ooo. ooooo o o .oo o
8 `8. 8 `b d' .P 8 8
8 `8 o8oo `bd' .P 8 .oPYo. .oPYo. odYo. o8P
8 8 8 .PY. oPooo8 8 8 8oooo8 8' `8 8
8 .P 8 .P Y. .P 8 8 8 8. 8 8 8
8ooo' 8 .P Y. .P 8 `YooP8 `Yooo' 8 8 8
.....:::..::::..::::..:::..:::::..:....8 :.....:..::..::..:
::::::::::::::::::::::::::::::::::::ooP'.::::::::::::::::::
::::::::::::::::::::::::::::::::::::...::::::::::::::::::::

@ -1,12 +1,18 @@
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper> <mapper namespace="com.bsmlab.dfx.agent.listener.dto.ListenerMapper">
<insert id="com.bsmlab.dfx.agent.listener.dto.ListenerMapper.insertReceiveMessage" parameterType="map"> <select id="selectReceiveMessageCountByPk" parameterType="com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto" resultType="int">
SELECT COUNT(*) AS COUNTER
FROM TB_RECEIVE_MESSAGE
WHERE SENDER_HOST_ID = #{senderHostId}
AND MESSAGE_UUID = #{messageUuid}
</select>
<insert id="insertReceiveMessage" parameterType="com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto">
INSERT INTO TB_RECEIVE_MESSAGE ( INSERT INTO TB_RECEIVE_MESSAGE (
SENDER_HOST_ID, MESSAGE_UUID, RECEIVED_TIMESTAMP, MESSAGE_TYPE, DATA, PROCESS_STATUS SENDER_HOST_ID, SENDER_TIMESTAMP, MESSAGE_UUID, MESSAGE_TYPE, RECEIVED_TIMESTAMP, RECIPIENT_HOST_ID, RECIPIENT_DROP_BOX, DATA, PROCESS_STATUS
) )
VALUES ( VALUES (
#{SENDER_HOST_ID}, #{MESSAGE_UUID}, #{RECEIVED_TIMESTAMP}, #{MESSAGE_TYPE}, #{DATA}, #{PROCESS_STATUS} #{senderHostId}, #{senderTimestamp}, #{messageUuid}, #{messageType}, #{receivedTimestamp}, #{recipientHostId}, #{recipientDropBox}, #{data: CLOB}, #{processStatus}
); );
</insert> </insert>
</mapper> </mapper>

Loading…
Cancel
Save