저장된 ReceiveMessage 처리 중 DB 처리 완료

main
semin.baek 10 months ago
parent 5a8fbd548b
commit 82ad55fc03

@ -33,7 +33,9 @@
],
"sqlMapperLocations": ["D:/projects/bsm-lab/dfx/dfxagent/src/docs/mapper-examples/**/*.xml"],
"drop-box": {
"message-storage-root": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages",
"received-message-storage-root": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received",
"processed-message-storage-root": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed",
"failure-message-storage-root": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure",
"drop-box-list": [
{
"drop-box-id": "save-violation-history",

@ -34,7 +34,9 @@ public class AgentConfigDto {
@Data
public static class DropBoxConfig {
private String messageStorageRoot;
private String receivedMessageStorageRoot;
private String processedMessageStorageRoot;
private String failureMessageStorageRoot;
private List<DropBox> dropBoxList;
}

@ -170,7 +170,7 @@ public class DfxAgentConfiguration {
}
@Bean(name = "innerSqlSessionFactory")
public SqlSessionFactory innerSqlSessionFactory(@Qualifier("innerDataSource") DataSource dataSource, @Qualifier("settings") Settings settings) throws Exception {
public SqlSessionFactory innerSqlSessionFactory(@Qualifier("innerDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setMapperLocations(

@ -1,126 +0,0 @@
package com.bsmlab.dfx.agent.config;
import com.bsmlab.dfx.agent.config.datasource.DataSourceDto;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxDto;
import com.bsmlab.dfx.agent.task.postman.PostmanDto;
import com.fasterxml.jackson.databind.DatabindException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class Settings {
private final Map<String, DataSourceDto> dataSourceDtoMap = new HashMap<>();
private String messageStorageRootPath;
private Map<String, DropBoxDto> dropBoxDtoMap = new HashMap<>();
private Resource[] mapperLocations;
private int listenPort = 12345;
private Map<String, PostmanDto> postmanDtoMap;
public void loadSettingFile(String settingFilePath) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> settingMap = objectMapper.readValue(new File(settingFilePath), Map.class);
this.mapperLocations = this.createMapperLocations(settingMap);
this.listenPort = (Integer)settingMap.get("listen-port");
log.debug("settingMap: {}", settingMap);
this.parseDataSources(settingMap);
this.parseDropBoxes(settingMap);
} catch (DatabindException e) {
log.error("cannot parse a setting file. {}", settingFilePath, e);
log.error(e.getMessage(), e);
} catch (IOException e) {
log.error("cannot read a setting file. {}", settingFilePath);
log.error(e.getMessage(), e);
}
}
@SuppressWarnings("unchecked")
private void parseDataSources(Map<String, Object> settingMap) {
List<Map<String, String>> dataSourceMapList = (List<Map<String, String>>)settingMap.get("datasource");
for(Map<String, String> dataSourceMap : dataSourceMapList) {
DataSourceDto dataSourceDto = DataSourceDto.builder()
.dataSourceId(dataSourceMap.get("dataSourceId")).driverClassName(dataSourceMap.get("driverClassName"))
.url(dataSourceMap.get("url")).username(dataSourceMap.get("username")).password(dataSourceMap.get("password"))
.build();
dataSourceDtoMap.put(dataSourceDto.getDataSourceId(), dataSourceDto);
}
}
private Resource[] createMapperLocations(Map<String, Object> settingMap) throws FileNotFoundException {
Resource[] resources = null;
if(ObjectUtils.isNotEmpty(settingMap.get("mapperLocations"))) {
String[] locationStringList = (String[])settingMap.get("mapperLocations");
resources = new Resource[locationStringList.length];
int i = 0;
for(String location : locationStringList) {
resources[i] = new InputStreamResource(new FileInputStream(new File(location)));
i++;
}
}
return resources;
}
@SuppressWarnings("unchecked")
private void parseDropBoxes(Map<String, Object> settingMap) {
Map<String, Object> dropBoxSettingMap = (Map<String, Object>)settingMap.get("drop-box");
this.messageStorageRootPath = String.valueOf(dropBoxSettingMap.get("message-storage-root"));
File messageStorageRootFile = new File(this.messageStorageRootPath);
if(!messageStorageRootFile.exists()) {
messageStorageRootFile.mkdirs();
}
List<Map<String, String>> dropBoxMapList = (List<Map<String, String>>)dropBoxSettingMap.get("drop-box-list");
for(Map<String, String> dropBoxMap : dropBoxMapList) {
DropBoxDto dropBoxDto = DropBoxDto.builder()
.dropBoxId(dropBoxMap.get("drop-box-id")).taskType(dropBoxMap.get("task-type"))
.dataSourceId(dropBoxMap.get("dataSourceId")).sqlId(dropBoxMap.get("sql-id"))
.saveDirectoryRoot(dropBoxMap.get("save-directory-root"))
.build();
this.dropBoxDtoMap.put(dropBoxDto.getDropBoxId(), dropBoxDto);
}
}
private void parsePostman(Map<String, Object> settingMap) {
Map<String, PostmanDto> postmanDtoMap = new HashMap<>();
Map<String, Object> postmanSettingMap = (Map<String, Object>)settingMap.get("postman");
}
public Resource[] getMapperLocations() {
return this.mapperLocations;
}
public Map<String, DataSourceDto> getDataSourceDtoMap() {
return this.dataSourceDtoMap;
}
private Map<String, DropBoxDto> getDropBoxDtoMap() {
return this.dropBoxDtoMap;
}
public String getMessageStorageRootPath() {
return this.messageStorageRootPath;
}
public DropBoxDto getDropBoxDto(String dropBoxId) {
return this.dropBoxDtoMap.get(dropBoxId);
}
public int getListenPort() {
return this.listenPort;
}
}

@ -1,6 +0,0 @@
package com.bsmlab.dfx.agent.config.constant;
public enum ActionType {
TRIGGER,
SCHEDULED
}

@ -1,6 +0,0 @@
package com.bsmlab.dfx.agent.config.constant;
public enum MessageType {
SAVE_DB_DATA,
SAVE_FILE;
}

@ -1,5 +0,0 @@
package com.bsmlab.dfx.agent.config.constant;
public enum ProcessStatusType {
READ;
}

@ -1,5 +0,0 @@
package com.bsmlab.dfx.agent.config.constant;
public enum TaskType {
DB_READ_THEN_SEND
}

@ -1,5 +1,6 @@
package com.bsmlab.dfx.agent.listener.dto;
import com.bsmlab.dfx.agent.config.AgentConfigDto;
import lombok.*;
import java.io.Serializable;
@ -13,10 +14,17 @@ public class ReceiveMessageDto implements Serializable {
private String senderHostId;
private long senderTimestamp;
private String messageUuid;
private String messageType;
private AgentConfigDto.MessageType messageType;
private long receivedTimestamp;
private String recipientHostId;
private String recipientDropBoxId;
private String data;
private String processStatus;
private ProcessStatus processStatus;
public static enum ProcessStatus {
PROCESS_RECEIVED,
PROCESS_DONE,
PROCESS_NOT_POSSIBLE,
PROCESS_FAIL
}
}

@ -25,6 +25,7 @@ public class ListenerService {
ackDto = AckDto.builder().result(AckDto.ResultType.RECEIVE_FAIL).resultText("금일 전송한 메시지 중 중복된 UUID가 존재합니다.").build();
}
else {
//receiveMessageDto.setProcessStatus();
dropBoxService.add(receiveMessageDto);
ackDto = AckDto.builder().result(AckDto.ResultType.RECEIVE_SUCCESS).build();
}

@ -1,7 +1,6 @@
package com.bsmlab.dfx.agent.support;
import com.bsmlab.dfx.agent.config.constant.MessageType;
import com.bsmlab.dfx.agent.config.constant.ProcessStatusType;
import com.bsmlab.dfx.agent.config.AgentConfigDto;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import com.bsmlab.dfx.agent.support.exception.IllegalMessageException;
import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException;
@ -106,7 +105,7 @@ public class MessageUtils {
else {
recipientDropBoxId = String.valueOf(recipientMap.get("drop-box-id"));
}
String messageType;
AgentConfigDto.MessageType messageType;
if(map.get("message-type") == null) {
throw new InCompleteMessageException("message-type 엘리먼트를 찾을 수 없습니다.");
}
@ -114,10 +113,11 @@ public class MessageUtils {
throw new InCompleteMessageException("message-type 값을 찾을 수 없습니다.");
}
else {
messageType = String.valueOf(map.get("message-type"));
if(!EnumUtils.isValidEnum(MessageType.class, messageType)) {
throw new InCompleteMessageException("message-type 값이 옳바르지 않습니다. " + messageType);
String messageTypeString = String.valueOf(map.get("message-type"));
if(!EnumUtils.isValidEnum(AgentConfigDto.MessageType.class, messageTypeString)) {
throw new InCompleteMessageException("message-type 값이 옳바르지 않습니다. " + messageTypeString);
}
messageType = EnumUtils.getEnum(AgentConfigDto.MessageType.class, messageTypeString);
}
String dataString;
if(map.get("data") == null) {
@ -135,7 +135,7 @@ public class MessageUtils {
.senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.messageUuid(messageUuid).messageType(messageType).receivedTimestamp(receivedTimestamp)
.recipientHostId(recipientHostId).recipientDropBoxId(recipientDropBoxId)
.data(dataString).processStatus(ProcessStatusType.READ.name())
.data(dataString).processStatus(ReceiveMessageDto.ProcessStatus.PROCESS_RECEIVED)
.build();
}
catch(JsonProcessingException e) {

@ -5,10 +5,13 @@ import com.bsmlab.dfx.agent.config.AgentConfigReader;
import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService;
import com.bsmlab.dfx.agent.listener.dto.AckDto;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.common.util.StringUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@ -16,10 +19,7 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.*;
import java.util.List;
import java.util.Map;
@ -29,12 +29,20 @@ import java.util.Map;
public class TaskExecutorService {
private final AgentConfigReader agentConfigReader;
private final SqlExecuteService sqlExecuteService;
private final DropBoxService dropBoxService;
@Async("threadPoolTaskExecutor")
public void processDropBox(String messageFilePath) {
//TODO 1. 메시지 처리
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().build();
ReceiveMessageDto.ProcessStatus processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_DONE;
String processMessage = "";
String messageUuid = "";
try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(messageFilePath))) {
ReceiveMessageDto receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject();
receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject();
messageUuid = receiveMessageDto.getMessageUuid();
AgentConfigDto.DropBox dropBox = agentConfigReader.getDropBox(receiveMessageDto.getRecipientDropBoxId());
log.info("process messageUuid:{} dropBoxId: {}", receiveMessageDto.getMessageUuid(), dropBox.getDropBoxId());
if(dropBox.getTaskType() == AgentConfigDto.TaskType.SAVE_DB_TABLE) {
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> dataMapList = null;
@ -44,17 +52,47 @@ public class TaskExecutorService {
}
}
else if(dropBox.getTaskType() == AgentConfigDto.TaskType.RECEIVE_FILE) {
//TODO 1.2 파일 수신 처리
}
} catch (FileNotFoundException e) {
//TODO 수신한 메시지 파일을 찾지 못했으니 메시지 처리 불가. 로그 남기고 Ack.PROCESS_FAIL 전달
throw new RuntimeException(e);
//수신한 메시지 파일을 알 수 없으므로 로그 남기고 종료
dropBoxService.clearMessageFile(messageFilePath);
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_NOT_POSSIBLE;
processMessage = e.getMessage();
log.error("process not possible - messageFilePath: ");
log.error("{}", e, e);
} catch (IOException e) {
//TODO 수신한 메시지를 로드하지 못했으니 메시지 처리 불가. 로그 남기고 파일을 미처리 상태 경로로 옮기고 Ack.PROCESS_FAIL 전달
throw new RuntimeException(e);
//수신한 메시지를 로드하지 못했으니 메시지 처리 불가. ReceiveMessageDto 를 알 수 없으므로 로그 남기고 종료
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
processMessage = e.getMessage();
log.error("cannot load a message file - messageFilePath: ");
log.error("{}", e, e);
} catch (ClassNotFoundException e) {
//TODO ReceiveMessageDto 변환 실패. 로그 남기고 파일을 미처리 상태 경로로 옮기고 Ack.PROCESS_FAIL 전달
throw new RuntimeException(e);
//ReceiveMessageDto 변환 실패. 로그 남기고 파일을 미처리 상태 경로로 옮기고 ReceiveMessageDto 를 알 수 없으므로 종료
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
processMessage = e.getMessage();
log.error("cannot parse a message file - messageFilePath: ");
log.error("{}", e, e);
} catch (Exception e) {
//기타 메시지 처리 중 오류
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
processMessage = e.getMessage();
log.error("cannot process a message file - messageFilePath: ");
log.error("{}", e, e);
}
//2. 결과 Ack 전송
if(StringUtils.isNotBlank(receiveMessageDto.getMessageUuid()) && StringUtils.isNotBlank(receiveMessageDto.getSenderHostId())) {
receiveMessageDto.setProcessStatus(processStatus);
this.ackDropBoxProcessResult(receiveMessageDto, processMessage);
}
//3. 처리한 메시지 이동
try {
boolean isSuccess = ReceiveMessageDto.ProcessStatus.PROCESS_DONE == processStatus; // PROCESS_DONE 이면 success 경로로 옮기고 아니면 failure 경로로 옮김
if(ReceiveMessageDto.ProcessStatus.PROCESS_NOT_POSSIBLE != processStatus) { // PROCESS_NOT_POSSIBLE 인 경우는 메시지 파일을 찾을 수 없으므로 옮길 수 없음
dropBoxService.moveMessageFile(messageFilePath, isSuccess);
}
} catch (IOException ex) {
log.error("{}", ex, ex);
}
}
@ -63,23 +101,25 @@ public class TaskExecutorService {
}
private void ackDropBoxProcessResult(ReceiveMessageDto receiveMessageDto) {
AckDto ackDto = AckDto.builder().build();
private void ackDropBoxProcessResult(ReceiveMessageDto receiveMessageDto, String processMessage) {
AckDto.ResultType resultType = EnumUtils.getEnum(AckDto.ResultType.class, receiveMessageDto.getProcessStatus().toString());
AckDto ackDto = AckDto.builder().result(resultType).messageUuid(receiveMessageDto.getMessageUuid()).resultText(processMessage).build();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<AckDto> bodyEntity = new HttpEntity<>(ackDto, httpHeaders);
RestTemplate restTemplate = new RestTemplate();
//TODO sender 정보를 찾아야 함.
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(receiveMessageDto.getSenderHostId());
String url = "https://" + knownAgent.getHostName() + "/telegram";
String response = restTemplate.postForObject("hostname", bodyEntity, String.class);
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/telegram";
String response = restTemplate.postForObject(url, bodyEntity, String.class);
ObjectMapper objectMapper = new ObjectMapper();
AckDto responseAckDto = null;
try {
AckDto responseAckDto = objectMapper.readValue(response, AckDto.class);
// 수신 메시지 처리 완료. 메시지 삭제
responseAckDto = objectMapper.readValue(response, AckDto.class);
log.info("message processing is done. {}", receiveMessageDto.getMessageUuid());
log.info("response {}", responseAckDto);
} catch (JsonProcessingException e) {
//TODO 처리 결과 Ack 파싱 실패. 로그 남기고 메시지 삭제
throw new RuntimeException(e);
log.error("Exception after sending ACK. messageUuid: {} ", receiveMessageDto.getMessageUuid());
log.error("{}", e, e);
}
}
}

@ -1,6 +1,5 @@
package com.bsmlab.dfx.agent.task;
import com.bsmlab.dfx.agent.config.Settings;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import io.micrometer.common.util.StringUtils;
import jakarta.annotation.PostConstruct;
@ -14,7 +13,6 @@ import org.springframework.stereotype.Component;
public class TaskExecutorStarter {
private final DropBoxService dropBoxService;
private final TaskExecutorService taskExecutorService;
private final Settings settings;
@PostConstruct
public void run() {

@ -1,5 +1,6 @@
package com.bsmlab.dfx.agent.task.dropbox;
import com.bsmlab.dfx.agent.config.AgentConfigDto;
import lombok.Builder;
import lombok.Data;
@ -7,7 +8,7 @@ import lombok.Data;
@Builder
public class DropBoxDto {
private String dropBoxId;
private String taskType;
private AgentConfigDto.TaskType taskType;
private String dataSourceId;
private String sqlId;
private String saveDirectoryRoot;

@ -1,13 +1,16 @@
package com.bsmlab.dfx.agent.task.dropbox;
import com.bsmlab.dfx.agent.config.AgentConfigReader;
import com.bsmlab.dfx.agent.config.Settings;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@ -31,7 +34,7 @@ public class DropBoxService {
*/
public boolean isExistToday(ReceiveMessageDto receiveMessageDto) {
boolean isExist = false;
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getMessageStorageRoot());
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot());
Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/");
String todayDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
@ -51,7 +54,7 @@ public class DropBoxService {
* @param receiveMessageDto
*/
public void add(ReceiveMessageDto receiveMessageDto) {
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getMessageStorageRoot());
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot());
Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh");
String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
@ -79,4 +82,34 @@ public class DropBoxService {
public String poll() {
return queue.poll();
}
public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException {
File root = null;
if(isSuccess) {
root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getProcessedMessageStorageRoot());
}
else {
root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getFailureMessageStorageRoot());
}
Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh");
String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
File targetDirectory = new File(targetDirectoryString);
if(!targetDirectory.exists()) {
targetDirectory.mkdirs();
}
File messageFile = new File(messageFilePath);
Path source = messageFile.toPath();
Path target = Paths.get(targetDirectoryString, messageFile.getName());
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.info("a message file moved. from [{}] to [{}]", messageFilePath, target.toAbsolutePath());
}
public void clearMessageFile(String messageFilePath) {
File messageFile = new File(messageFilePath);
if(messageFile.exists()) {
messageFile.delete();
}
log.info("a message file cleared. [{}]", messageFilePath);
}
}

@ -1,8 +1,6 @@
package com.bsmlab.dfx.agent.task.postman;
import com.bsmlab.dfx.agent.config.constant.ActionType;
import com.bsmlab.dfx.agent.config.constant.MessageType;
import com.bsmlab.dfx.agent.config.constant.TaskType;
import com.bsmlab.dfx.agent.config.AgentConfigDto;
import lombok.Builder;
import lombok.Data;
@ -13,21 +11,24 @@ import java.util.List;
@Builder
public class PostmanDto {
private String postmanId;
private TaskType taskType;
private AgentConfigDto.TaskType taskType;
private PostmanActionType postmanActionType;
private PostmanMessageType postmanMessageType;
private String recipientHostId;
private String recipientDropBoxId;
private List<String> routingHostIdList;
public class PostmanActionType {
private ActionType actionType;
@Data
public static class PostmanActionType {
private AgentConfigDto.ActionType actionType;
private String command;
private List<String> parameterKeyList = new ArrayList<>();
private String cron;
}
public class PostmanMessageType {
private MessageType messageType;
@Data
public static class PostmanMessageType {
private AgentConfigDto.MessageType messageType;
private String dataSourceId;
private String sqlId;
}

Loading…
Cancel
Save