DropBox(메시지 처리) 진행중

- DB to DB 처리중
main
semin.baek 10 months ago
parent 3355d16f28
commit 7e3458ee7a

@ -16,7 +16,9 @@
} }
], ],
"sqlMapperLocations": ["D:/projects/bsm-lab/dfx/dfxagent/src/docs/mapper-examples/**/*.xml"], "sqlMapperLocations": ["D:/projects/bsm-lab/dfx/dfxagent/src/docs/mapper-examples/**/*.xml"],
"drop-box": [ "drop-box": {
"message-storage-root": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages",
"drop-box-list": [
{ {
"drop-box-id": "save-violation-history", "drop-box-id": "save-violation-history",
"task-type": "SAVE_DATA_TABLE", "task-type": "SAVE_DATA_TABLE",
@ -29,4 +31,5 @@
"save-directory-root": "D:\\projects\\bsm-lab\\dfx\\run\\receive_file" "save-directory-root": "D:\\projects\\bsm-lab\\dfx\\run\\receive_file"
} }
] ]
}
} }

@ -21,6 +21,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.io.File; import java.io.File;
@ -31,6 +32,7 @@ import java.nio.file.Paths;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
@Slf4j @Slf4j
@Configuration @Configuration
@ -181,4 +183,16 @@ public class DfxAgentConfiguration {
public DataSourceTransactionManager innerTransactionManager(@Qualifier("innerDataSource") DataSource dataSource) { public DataSourceTransactionManager innerTransactionManager(@Qualifier("innerDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource); return new DataSourceTransactionManager(dataSource);
} }
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(30); // 최소 쓰레드
threadPoolTaskExecutor.setMaxPoolSize(300); // 최대 쓰레드
threadPoolTaskExecutor.setQueueCapacity(300); // 대기 큐
threadPoolTaskExecutor.setThreadNamePrefix("dfxExecutor-");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
} }

@ -22,6 +22,7 @@ import java.util.Map;
@Component @Component
public class Settings { public class Settings {
private final Map<String, DataSourceDto> dataSourceDtoMap = new HashMap<>(); private final Map<String, DataSourceDto> dataSourceDtoMap = new HashMap<>();
private String messageStorageRootPath;
private Map<String, DropBoxDto> dropBoxDtoMap = new HashMap<>(); private Map<String, DropBoxDto> dropBoxDtoMap = new HashMap<>();
private Resource[] mapperLocations; private Resource[] mapperLocations;
public void loadSettingFile(String settingFilePath) { public void loadSettingFile(String settingFilePath) {
@ -69,7 +70,13 @@ public class Settings {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void parseDropBoxes(Map<String, Object> settingMap) { private void parseDropBoxes(Map<String, Object> settingMap) {
List<Map<String, String>> dropBoxMapList = (List<Map<String, String>>)settingMap.get("drop-box"); Map<String, Object> dropBoxSettingMap = (Map<String, Object>)settingMap.get("drop-box");
this.messageStorageRootPath = String.valueOf(dropBoxSettingMap.get("message-storage-root"));
File messageStorageRootFile = new File(this.messageStorageRootPath);
if(!messageStorageRootFile.exists()) {
messageStorageRootFile.mkdirs();
}
List<Map<String, String>> dropBoxMapList = (List<Map<String, String>>)dropBoxSettingMap.get("drop-box-list");
//TODO drop-box executor 개발 -> ListenerController //TODO drop-box executor 개발 -> ListenerController
for(Map<String, String> dropBoxMap : dropBoxMapList) { for(Map<String, String> dropBoxMap : dropBoxMapList) {
DropBoxDto dropBoxDto = DropBoxDto.builder() DropBoxDto dropBoxDto = DropBoxDto.builder()
@ -92,4 +99,12 @@ public class Settings {
private Map<String, DropBoxDto> getDropBoxDtoMap() { private Map<String, DropBoxDto> getDropBoxDtoMap() {
return this.dropBoxDtoMap; return this.dropBoxDtoMap;
} }
public String getMessageStorageRootPath() {
return this.messageStorageRootPath;
}
public DropBoxDto getDropBoxDto(String dropBoxId) {
return this.dropBoxDtoMap.get(dropBoxId);
}
} }

@ -10,12 +10,11 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor @RequiredArgsConstructor
@Component @Component
public class StartupRunner implements ApplicationRunner { public class StartupRunner implements ApplicationRunner {
private final Settings settings;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
log.debug("run"); log.debug("run");
// 1. dfxagent.yml 로드 및 설정 관리자에 저장 : DfxAgentConfiguration, Settings
// 2. embedded db 체크 및 생성 : DfxAgentConfiguration
// 3. dfxagent.yml 을 기반으로 데이터소스 및 커넥션 풀 생성 : DfxAgentConfiguration, datasource.*
//TODO 4. Worker 쓰레드 생성 //TODO 4. Worker 쓰레드 생성
} }
} }

@ -22,9 +22,9 @@ public class ListenerController {
AckDto ackDto = AckDto.builder().build(); AckDto ackDto = AckDto.builder().build();
try { try {
String bodyString = ServletUtils.getBodyString(request); String bodyString = ServletUtils.getBodyString(request);
ackDto = listenerService.saveNewMessage(bodyString); ackDto = listenerService.receiveMessage(bodyString);
} catch (IOException e) { } catch (IOException e) {
ackDto.setResult("fail"); ackDto.setResult(AckDto.RECEIVE_FAIL);
ackDto.setMessage(e.getMessage()); ackDto.setMessage(e.getMessage());
} }
return ackDto; return ackDto;

@ -7,9 +7,21 @@ import lombok.Setter;
@Getter @Getter
@Setter @Setter
@Builder @Builder
/**
* Listen
* 1. listener
* 2. ( (DropBoxQueue) )
* 3. Ack (result : RECEIVE_SUCCESS)
* 3.1 Ack (result : RECEIVE_FAIL)
* 4. Worker ( )
* 4.1 Worker sender Ack (result : PROCESS_SUCCESS)
* 4.2 Worker sender Ack (result : PROCESS_FAIL)
*/
public class AckDto { public class AckDto {
public static final String RESULT_SUCCESS = "SUCCESS"; public static final String RECEIVE_SUCCESS = "RECEIVE_SUCCESS";
public static final String RESULT_FAIL = "FAIL"; public static final String RECEIVE_FAIL = "RECEIVE_FAIL";
public static final String PROCESS_SUCCESS = "PROCESS_SUCCESS";
public static final String PROCESS_FAIL = "PROCESS_FAIL";
private String result; private String result;
private String message; private String message;
} }

@ -2,19 +2,21 @@ package com.bsmlab.dfx.agent.listener.dto;
import lombok.*; import lombok.*;
import java.io.Serializable;
@Getter @Getter
@Setter @Setter
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
public class ReceiveMessageDto { public class ReceiveMessageDto implements Serializable {
private String senderHostId; private String senderHostId;
private long senderTimestamp; private long senderTimestamp;
private String messageUuid; private String messageUuid;
private String messageType; private String messageType;
private long receivedTimestamp; private long receivedTimestamp;
private String recipientHostId; private String recipientHostId;
private String recipientDropBox; private String recipientDropBoxId;
private String data; private String data;
private String processStatus; private String processStatus;
} }

@ -1,5 +1,6 @@
package com.bsmlab.dfx.agent.listener.service; package com.bsmlab.dfx.agent.listener.service;
import com.bsmlab.dfx.agent.config.Settings;
import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService;
import com.bsmlab.dfx.agent.listener.dto.AckDto; import com.bsmlab.dfx.agent.listener.dto.AckDto;
import com.bsmlab.dfx.agent.listener.dto.ListenerMapper; import com.bsmlab.dfx.agent.listener.dto.ListenerMapper;
@ -8,6 +9,7 @@ import com.bsmlab.dfx.agent.support.MessageUtils;
import com.bsmlab.dfx.agent.support.exception.IllegalMessageException; import com.bsmlab.dfx.agent.support.exception.IllegalMessageException;
import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException; import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException;
import com.bsmlab.dfx.agent.support.exception.NullMessageException; import com.bsmlab.dfx.agent.support.exception.NullMessageException;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -16,6 +18,7 @@ import org.springframework.stereotype.Service;
@Slf4j @Slf4j
@Service @Service
public class ListenerService { public class ListenerService {
private final DropBoxService dropBoxService;
private final ListenerMapper listenerMapper; private final ListenerMapper listenerMapper;
private final SqlExecuteService sqlExecuteService; private final SqlExecuteService sqlExecuteService;
@ -25,17 +28,35 @@ public class ListenerService {
ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString); ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString);
int counter = listenerMapper.selectReceiveMessageCountByPk(receiveMessageDto); int counter = listenerMapper.selectReceiveMessageCountByPk(receiveMessageDto);
if(counter > 0) { if(counter > 0) {
ackDto = AckDto.builder().result(AckDto.RESULT_FAIL).message("이전 전송한 메시지 중 중복된 UUID가 존재합니다.").build(); ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message("이전 전송한 메시지 중 중복된 UUID가 존재합니다.").build();
} }
else { else {
listenerMapper.insertReceiveMessage(receiveMessageDto); listenerMapper.insertReceiveMessage(receiveMessageDto);
ackDto = AckDto.builder().result(AckDto.RESULT_SUCCESS).build(); ackDto = AckDto.builder().result(AckDto.RECEIVE_SUCCESS).build();
} }
} catch (IllegalMessageException | NullMessageException | InCompleteMessageException e) { } catch (IllegalMessageException | NullMessageException | InCompleteMessageException e) {
log.error("{}", e, e); log.error("{}", e, e);
ackDto = AckDto.builder().result(AckDto.RESULT_FAIL).message(e.getLocalizedMessage()).build(); ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message(e.getLocalizedMessage()).build();
}
return ackDto;
}
public AckDto receiveMessage(String messageJsonString) {
AckDto ackDto = null;
try {
ReceiveMessageDto receiveMessageDto = MessageUtils.toReceiveMessageDto(messageJsonString);
if(dropBoxService.isExistToday(receiveMessageDto)) {
ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message("금일 전송한 메시지 중 중복된 UUID가 존재합니다.").build();
}
else {
dropBoxService.add(receiveMessageDto);
ackDto = AckDto.builder().result(AckDto.RECEIVE_SUCCESS).build();
}
} catch (IllegalMessageException | NullMessageException | InCompleteMessageException e) {
log.error("{}", e, e);
ackDto = AckDto.builder().result(AckDto.RECEIVE_FAIL).message(e.getLocalizedMessage()).build();
} }
return ackDto; return ackDto;
} }

@ -13,6 +13,7 @@ import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -118,11 +119,23 @@ public class MessageUtils {
throw new InCompleteMessageException("message-type 값이 옳바르지 않습니다. " + messageType); throw new InCompleteMessageException("message-type 값이 옳바르지 않습니다. " + messageType);
} }
} }
String dataString;
if(map.get("data") == null) {
throw new InCompleteMessageException("data 엘리먼트를 찾을 수 없습니다.");
}
else if(!(map.get("data") instanceof List)) {
throw new InCompleteMessageException("data 엘리먼트의 데이터가 객체타입이 아닙니다.");
}
else {
List<Map<String, Object>> dataList = (List<Map<String, Object>>) map.get("data");
dataString = objectMapper.writeValueAsString(dataList);
}
receiveMessageDto = ReceiveMessageDto.builder() receiveMessageDto = ReceiveMessageDto.builder()
.senderHostId(senderHostId).senderTimestamp(senderTimestamp) .senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.messageUuid(messageUuid).messageType(messageType).receivedTimestamp(receivedTimestamp) .messageUuid(messageUuid).messageType(messageType).receivedTimestamp(receivedTimestamp)
.recipientHostId(recipientHostId).recipientDropBox(recipientDropBoxId) .recipientHostId(recipientHostId).recipientDropBoxId(recipientDropBoxId)
.data(messageJsonString).processStatus(ProcessStatusType.READ.name()) .data(dataString).processStatus(ProcessStatusType.READ.name())
.build(); .build();
} }
catch(JsonProcessingException e) { catch(JsonProcessingException e) {

@ -0,0 +1,52 @@
package com.bsmlab.dfx.agent.task;
import com.bsmlab.dfx.agent.config.Settings;
import com.bsmlab.dfx.agent.config.constant.MessageType;
import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxDto;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.*;
import java.util.List;
import java.util.Map;
@Service
@RequiredArgsConstructor
@Slf4j
public class TaskExecutorService {
private final Settings settings;
private final SqlExecuteService sqlExecuteService;
@Async("threadPoolTaskExecutor")
public void process(String messageFilePath) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(messageFilePath))) {
ReceiveMessageDto receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject();
DropBoxDto dropBoxDto = settings.getDropBoxDto(receiveMessageDto.getRecipientDropBoxId());
if("SAVE_DATA_TABLE".equals(dropBoxDto.getTaskType())) {
//public Map<String, Object> insert(String dataSourceId, String sqlId, Map<String, Object> parameter) {
String dataString = receiveMessageDto.getData();
List<Map<String, Object>> dataMapList = null; //(List<Map<String, Object>>)receiveMessageDto.getData();
ObjectMapper objectMapper = new ObjectMapper();
dataMapList = (List<Map<String, Object>>) objectMapper.readValue(receiveMessageDto.getData(), List.class);
for(Map<String, Object> dataMap : dataMapList) {
sqlExecuteService.insert(dropBoxDto.getDataSourceId(), dropBoxDto.getSqlId(), dataMap);
}
}
else if("RECEIVE_FILE".equals(dropBoxDto.getTaskType())) {
}
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,33 @@
package com.bsmlab.dfx.agent.task;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import io.micrometer.common.util.StringUtils;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class TaskExecutorStarter {
private final DropBoxService dropBoxService;
private final TaskExecutorService taskExecutorService;
@PostConstruct
public void run() {
while(true) {
try {
Thread.sleep(10);
String messageFilePath = dropBoxService.poll();
if(StringUtils.isNotBlank(messageFilePath)) {
taskExecutorService.process(messageFilePath);
}
} catch (InterruptedException e) {
log.error("{}", e, e);
Thread.currentThread().interrupt();
}
}
}
}

@ -0,0 +1,64 @@
package com.bsmlab.dfx.agent.task.dropbox;
import com.bsmlab.dfx.agent.config.Settings;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.*;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
@RequiredArgsConstructor
@Slf4j
public class DropBoxService {
private final Settings settings;
private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
public boolean isExistToday(ReceiveMessageDto receiveMessageDto) {
boolean isExist = false;
File root = new File(settings.getMessageStorageRootPath());
Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/");
String todayDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
DecimalFormat decimalFormat = new DecimalFormat("00");
for(int i = 0; i < 24; i++) {
File targetFilePath = new File(todayDirectoryString + decimalFormat.format(i) + "/" + receiveMessageDto.getMessageUuid());
isExist = targetFilePath.exists();
if(isExist) {
break;
}
}
return isExist;
}
public void add(ReceiveMessageDto receiveMessageDto) {
File root = new File(settings.getMessageStorageRootPath());
Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh");
String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
File targetDirectory = new File(targetDirectoryString);
if(!targetDirectory.exists()) {
targetDirectory.mkdirs();
}
String targetFilePath = targetDirectoryString + "/" + receiveMessageDto.getMessageUuid();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(targetFilePath)))) {
objectOutputStream.writeObject(receiveMessageDto);
this.queue.add(targetFilePath);
}
catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public String poll() {
return queue.poll();
}
}
Loading…
Cancel
Save