From 7e3458ee7a008de7005c45e68fd841452e46974f Mon Sep 17 00:00:00 2001 From: "semin.baek" Date: Thu, 13 Mar 2025 16:34:28 +0900 Subject: [PATCH] =?UTF-8?q?DropBox(=EB=A9=94=EC=8B=9C=EC=A7=80=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC)=20=EC=A7=84=ED=96=89=EC=A4=91=20=20-=20DB=20to=20DB?= =?UTF-8?q?=20=EC=B2=98=EB=A6=AC=EC=A4=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/docs/settings-examples/dfxagent.json | 29 +++++---- .../agent/config/DfxAgentConfiguration.java | 14 ++++ .../com/bsmlab/dfx/agent/config/Settings.java | 17 ++++- .../dfx/agent/config/StartupRunner.java | 5 +- .../agent/listener/ListenerController.java | 4 +- .../bsmlab/dfx/agent/listener/dto/AckDto.java | 16 ++++- .../agent/listener/dto/ReceiveMessageDto.java | 6 +- .../listener/service/ListenerService.java | 27 +++++++- .../dfx/agent/support/MessageUtils.java | 17 ++++- .../dfx/agent/task/TaskExecutorService.java | 52 +++++++++++++++ .../dfx/agent/task/TaskExecutorStarter.java | 33 ++++++++++ .../agent/task/dropbox/DropBoxService.java | 64 +++++++++++++++++++ 12 files changed, 256 insertions(+), 28 deletions(-) create mode 100644 src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java create mode 100644 src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java create mode 100644 src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java diff --git a/src/docs/settings-examples/dfxagent.json b/src/docs/settings-examples/dfxagent.json index 6dc4596..73f6bba 100644 --- a/src/docs/settings-examples/dfxagent.json +++ b/src/docs/settings-examples/dfxagent.json @@ -16,17 +16,20 @@ } ], "sqlMapperLocations": ["D:/projects/bsm-lab/dfx/dfxagent/src/docs/mapper-examples/**/*.xml"], - "drop-box": [ - { - "drop-box-id": "save-violation-history", - "task-type": "SAVE_DATA_TABLE", - "dataSourceId": "dfcms", - "sql-id": "dfcms.violation.insertViolationHistory" - }, - { - "drop-box-id": "receive-work-image-file", - "task-type": "RECEIVE_FILE", - "save-directory-root": "D:\\projects\\bsm-lab\\dfx\\run\\receive_file" - } - ] + "drop-box": { + "message-storage-root": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages", + "drop-box-list": [ + { + "drop-box-id": "save-violation-history", + "task-type": "SAVE_DATA_TABLE", + "dataSourceId": "dfcms", + "sql-id": "dfcms.violation.insertViolationHistory" + }, + { + "drop-box-id": "receive-work-image-file", + "task-type": "RECEIVE_FILE", + "save-directory-root": "D:\\projects\\bsm-lab\\dfx\\run\\receive_file" + } + ] + } } diff --git a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java index b488fc9..5bf1c19 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java @@ -21,6 +21,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.sql.DataSource; import java.io.File; @@ -31,6 +32,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; @Slf4j @Configuration @@ -181,4 +183,16 @@ public class DfxAgentConfiguration { public DataSourceTransactionManager innerTransactionManager(@Qualifier("innerDataSource") DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } + + @Bean(name = "threadPoolTaskExecutor") + public Executor threadPoolTaskExecutor() { + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); + threadPoolTaskExecutor.setCorePoolSize(30); // 최소 쓰레드 + threadPoolTaskExecutor.setMaxPoolSize(300); // 최대 쓰레드 + threadPoolTaskExecutor.setQueueCapacity(300); // 대기 큐 + threadPoolTaskExecutor.setThreadNamePrefix("dfxExecutor-"); + threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부 + threadPoolTaskExecutor.initialize(); + return threadPoolTaskExecutor; + } } diff --git a/src/main/java/com/bsmlab/dfx/agent/config/Settings.java b/src/main/java/com/bsmlab/dfx/agent/config/Settings.java index aeba75b..2bbe58b 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/Settings.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/Settings.java @@ -22,6 +22,7 @@ import java.util.Map; @Component public class Settings { private final Map dataSourceDtoMap = new HashMap<>(); + private String messageStorageRootPath; private Map dropBoxDtoMap = new HashMap<>(); private Resource[] mapperLocations; public void loadSettingFile(String settingFilePath) { @@ -69,7 +70,13 @@ public class Settings { @SuppressWarnings("unchecked") private void parseDropBoxes(Map settingMap) { - List> dropBoxMapList = (List>)settingMap.get("drop-box"); + Map dropBoxSettingMap = (Map)settingMap.get("drop-box"); + this.messageStorageRootPath = String.valueOf(dropBoxSettingMap.get("message-storage-root")); + File messageStorageRootFile = new File(this.messageStorageRootPath); + if(!messageStorageRootFile.exists()) { + messageStorageRootFile.mkdirs(); + } + List> dropBoxMapList = (List>)dropBoxSettingMap.get("drop-box-list"); //TODO drop-box executor 개발 -> ListenerController for(Map dropBoxMap : dropBoxMapList) { DropBoxDto dropBoxDto = DropBoxDto.builder() @@ -92,4 +99,12 @@ public class Settings { private Map getDropBoxDtoMap() { return this.dropBoxDtoMap; } + + public String getMessageStorageRootPath() { + return this.messageStorageRootPath; + } + + public DropBoxDto getDropBoxDto(String dropBoxId) { + return this.dropBoxDtoMap.get(dropBoxId); + } } diff --git a/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java b/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java index 4817a5e..8e5a82d 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java @@ -10,12 +10,11 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor @Component public class StartupRunner implements ApplicationRunner { + private final Settings settings; + @Override public void run(ApplicationArguments args) throws Exception { log.debug("run"); - // 1. dfxagent.yml 로드 및 설정 관리자에 저장 : DfxAgentConfiguration, Settings - // 2. embedded db 체크 및 생성 : DfxAgentConfiguration - // 3. dfxagent.yml 을 기반으로 데이터소스 및 커넥션 풀 생성 : DfxAgentConfiguration, datasource.* //TODO 4. Worker 쓰레드 생성 } } diff --git a/src/main/java/com/bsmlab/dfx/agent/listener/ListenerController.java b/src/main/java/com/bsmlab/dfx/agent/listener/ListenerController.java index bffca4b..598affe 100644 --- a/src/main/java/com/bsmlab/dfx/agent/listener/ListenerController.java +++ b/src/main/java/com/bsmlab/dfx/agent/listener/ListenerController.java @@ -22,9 +22,9 @@ public class ListenerController { AckDto ackDto = AckDto.builder().build(); try { String bodyString = ServletUtils.getBodyString(request); - ackDto = listenerService.saveNewMessage(bodyString); + ackDto = listenerService.receiveMessage(bodyString); } catch (IOException e) { - ackDto.setResult("fail"); + ackDto.setResult(AckDto.RECEIVE_FAIL); ackDto.setMessage(e.getMessage()); } return ackDto; diff --git a/src/main/java/com/bsmlab/dfx/agent/listener/dto/AckDto.java b/src/main/java/com/bsmlab/dfx/agent/listener/dto/AckDto.java index 438cebf..bde64ae 100644 --- a/src/main/java/com/bsmlab/dfx/agent/listener/dto/AckDto.java +++ b/src/main/java/com/bsmlab/dfx/agent/listener/dto/AckDto.java @@ -7,9 +7,21 @@ import lombok.Setter; @Getter @Setter @Builder +/** + * Listen 프로세스 + * 1. listener 수신 + * 2. 수신 메시지 저장 (파일 저장 후 메시지 큐(DropBoxQueue) 에 추가) + * 3. 수신 메시지 저장 완료 Ack 전달 (result : RECEIVE_SUCCESS) + * 3.1 수신 메시지 저상 싪패시 Ack 전달 (result : RECEIVE_FAIL) + * 4. Worker 동작 (메지시 큐에서 가져오고 저장된 메시지를 규칙에 맞게 처리) + * 4.1 Worker 동작 성공시 sender 에게 처리완료 Ack 전달 (result : PROCESS_SUCCESS) + * 4.2 Worker 동작 실패시 sender 에게 처리오류 Ack 전달 (result : PROCESS_FAIL) + */ public class AckDto { - public static final String RESULT_SUCCESS = "SUCCESS"; - public static final String RESULT_FAIL = "FAIL"; + public static final String RECEIVE_SUCCESS = "RECEIVE_SUCCESS"; + public static final String RECEIVE_FAIL = "RECEIVE_FAIL"; + public static final String PROCESS_SUCCESS = "PROCESS_SUCCESS"; + public static final String PROCESS_FAIL = "PROCESS_FAIL"; private String result; private String message; } diff --git a/src/main/java/com/bsmlab/dfx/agent/listener/dto/ReceiveMessageDto.java b/src/main/java/com/bsmlab/dfx/agent/listener/dto/ReceiveMessageDto.java index ae555c8..65836ef 100644 --- a/src/main/java/com/bsmlab/dfx/agent/listener/dto/ReceiveMessageDto.java +++ b/src/main/java/com/bsmlab/dfx/agent/listener/dto/ReceiveMessageDto.java @@ -2,19 +2,21 @@ package com.bsmlab.dfx.agent.listener.dto; import lombok.*; +import java.io.Serializable; + @Getter @Setter @Builder @AllArgsConstructor @NoArgsConstructor -public class ReceiveMessageDto { +public class ReceiveMessageDto implements Serializable { private String senderHostId; private long senderTimestamp; private String messageUuid; private String messageType; private long receivedTimestamp; private String recipientHostId; - private String recipientDropBox; + private String recipientDropBoxId; private String data; private String processStatus; } diff --git a/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java b/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java index d2acd79..c596f5f 100644 --- a/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java +++ b/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java @@ -1,5 +1,6 @@ package com.bsmlab.dfx.agent.listener.service; +import com.bsmlab.dfx.agent.config.Settings; import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; import com.bsmlab.dfx.agent.listener.dto.AckDto; import com.bsmlab.dfx.agent.listener.dto.ListenerMapper; @@ -8,6 +9,7 @@ import com.bsmlab.dfx.agent.support.MessageUtils; import com.bsmlab.dfx.agent.support.exception.IllegalMessageException; import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException; import com.bsmlab.dfx.agent.support.exception.NullMessageException; +import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -16,6 +18,7 @@ import org.springframework.stereotype.Service; @Slf4j @Service public class ListenerService { + private final DropBoxService dropBoxService; private final ListenerMapper listenerMapper; private final SqlExecuteService sqlExecuteService; @@ -25,17 +28,35 @@ public class ListenerService { ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString); int counter = listenerMapper.selectReceiveMessageCountByPk(receiveMessageDto); if(counter > 0) { - ackDto = AckDto.builder().result(AckDto.RESULT_FAIL).message("이전 전송한 메시지 중 중복된 UUID가 존재합니다.").build(); + ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message("이전 전송한 메시지 중 중복된 UUID가 존재합니다.").build(); } else { listenerMapper.insertReceiveMessage(receiveMessageDto); - ackDto = AckDto.builder().result(AckDto.RESULT_SUCCESS).build(); + ackDto = AckDto.builder().result(AckDto.RECEIVE_SUCCESS).build(); } } catch (IllegalMessageException | NullMessageException | InCompleteMessageException e) { log.error("{}", e, e); - ackDto = AckDto.builder().result(AckDto.RESULT_FAIL).message(e.getLocalizedMessage()).build(); + ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message(e.getLocalizedMessage()).build(); + } + return ackDto; + } + + public AckDto receiveMessage(String messageJsonString) { + AckDto ackDto = null; + try { + ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString); + if(dropBoxService.isExistToday(receiveMessageDto)) { + ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message("금일 전송한 메시지 중 중복된 UUID가 존재합니다.").build(); + } + else { + dropBoxService.add(receiveMessageDto); + ackDto = AckDto.builder().result(AckDto.RECEIVE_SUCCESS).build(); + } + } catch (IllegalMessageException | NullMessageException | InCompleteMessageException e) { + log.error("{}", e, e); + ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message(e.getLocalizedMessage()).build(); } return ackDto; } diff --git a/src/main/java/com/bsmlab/dfx/agent/support/MessageUtils.java b/src/main/java/com/bsmlab/dfx/agent/support/MessageUtils.java index 381d210..f1d7d0d 100644 --- a/src/main/java/com/bsmlab/dfx/agent/support/MessageUtils.java +++ b/src/main/java/com/bsmlab/dfx/agent/support/MessageUtils.java @@ -13,6 +13,7 @@ import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -118,11 +119,23 @@ public class MessageUtils { throw new InCompleteMessageException("message-type 값이 옳바르지 않습니다. " + messageType); } } + String dataString; + if(map.get("data") == null) { + throw new InCompleteMessageException("data 엘리먼트를 찾을 수 없습니다."); + } + else if(!(map.get("data") instanceof List)) { + throw new InCompleteMessageException("data 엘리먼트의 데이터가 객체타입이 아닙니다."); + } + else { + List> dataList = (List>) map.get("data"); + dataString = objectMapper.writeValueAsString(dataList); + + } receiveMessageDto = ReceiveMessageDto.builder() .senderHostId(senderHostId).senderTimestamp(senderTimestamp) .messageUuid(messageUuid).messageType(messageType).receivedTimestamp(receivedTimestamp) - .recipientHostId(recipientHostId).recipientDropBox(recipientDropBoxId) - .data(messageJsonString).processStatus(ProcessStatusType.READ.name()) + .recipientHostId(recipientHostId).recipientDropBoxId(recipientDropBoxId) + .data(dataString).processStatus(ProcessStatusType.READ.name()) .build(); } catch(JsonProcessingException e) { diff --git a/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java b/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java new file mode 100644 index 0000000..1fa76cf --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java @@ -0,0 +1,52 @@ +package com.bsmlab.dfx.agent.task; + +import com.bsmlab.dfx.agent.config.Settings; +import com.bsmlab.dfx.agent.config.constant.MessageType; +import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; +import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; +import com.bsmlab.dfx.agent.task.dropbox.DropBoxDto; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.EnumUtils; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.io.*; +import java.util.List; +import java.util.Map; + +@Service +@RequiredArgsConstructor +@Slf4j +public class TaskExecutorService { + private final Settings settings; + private final SqlExecuteService sqlExecuteService; + + @Async("threadPoolTaskExecutor") + public void process(String messageFilePath) { + try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(messageFilePath))) { + ReceiveMessageDto receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject(); + DropBoxDto dropBoxDto = settings.getDropBoxDto(receiveMessageDto.getRecipientDropBoxId()); + if("SAVE_DATA_TABLE".equals(dropBoxDto.getTaskType())) { + //public Map insert(String dataSourceId, String sqlId, Map parameter) { + String dataString = receiveMessageDto.getData(); + List> dataMapList = null; //(List>)receiveMessageDto.getData(); + ObjectMapper objectMapper = new ObjectMapper(); + dataMapList = (List>) objectMapper.readValue(receiveMessageDto.getData(), List.class); + for(Map dataMap : dataMapList) { + sqlExecuteService.insert(dropBoxDto.getDataSourceId(), dropBoxDto.getSqlId(), dataMap); + } + } + else if("RECEIVE_FILE".equals(dropBoxDto.getTaskType())) { + + } + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java b/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java new file mode 100644 index 0000000..c25424d --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java @@ -0,0 +1,33 @@ +package com.bsmlab.dfx.agent.task; + +import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; +import io.micrometer.common.util.StringUtils; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class TaskExecutorStarter { + private final DropBoxService dropBoxService; + private final TaskExecutorService taskExecutorService; + + @PostConstruct + public void run() { + while(true) { + try { + Thread.sleep(10); + String messageFilePath = dropBoxService.poll(); + if(StringUtils.isNotBlank(messageFilePath)) { + taskExecutorService.process(messageFilePath); + } + } catch (InterruptedException e) { + log.error("{}", e, e); + Thread.currentThread().interrupt(); + } + + } + } +} diff --git a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java new file mode 100644 index 0000000..0cd4619 --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java @@ -0,0 +1,64 @@ +package com.bsmlab.dfx.agent.task.dropbox; + +import com.bsmlab.dfx.agent.config.Settings; +import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.io.*; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +@RequiredArgsConstructor +@Slf4j +public class DropBoxService { + private final Settings settings; + private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + public boolean isExistToday(ReceiveMessageDto receiveMessageDto) { + boolean isExist = false; + File root = new File(settings.getMessageStorageRootPath()); + Date today = new Date(System.currentTimeMillis()); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/"); + String todayDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); + DecimalFormat decimalFormat = new DecimalFormat("00"); + for(int i = 0; i < 24; i++) { + File targetFilePath = new File(todayDirectoryString + decimalFormat.format(i) + "/" + receiveMessageDto.getMessageUuid()); + isExist = targetFilePath.exists(); + if(isExist) { + break; + } + } + return isExist; + } + + public void add(ReceiveMessageDto receiveMessageDto) { + File root = new File(settings.getMessageStorageRootPath()); + Date today = new Date(System.currentTimeMillis()); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh"); + String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); + File targetDirectory = new File(targetDirectoryString); + if(!targetDirectory.exists()) { + targetDirectory.mkdirs(); + } + String targetFilePath = targetDirectoryString + "/" + receiveMessageDto.getMessageUuid(); + try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(targetFilePath)))) { + objectOutputStream.writeObject(receiveMessageDto); + this.queue.add(targetFilePath); + } + catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public String poll() { + return queue.poll(); + } +}