DropBox 처리 쓰레드 수 설정 기능 추가

Postman 처리 쓰레드 수 설정 기능 추가
기동시 미처리 메시지 처리 로직 추가 - bsm-lab/dfxagent#8
alive 메시지 로직 확인하여야 함
main
semin.baek 8 months ago
parent 57a45a0496
commit b118ce4071

@ -1,4 +1,4 @@
#!/bin/sh
AGENT_HOME=/home/dfxagent/agent
TODAY=$(date "+%Y%m%d")
java -jar $AGENT_HOME/lib/dfxagent.jar --setting.file=$AGENT_HOME/conf/settings.json &
java -jar $AGENT_HOME/lib/dfxagent.jar -Xms2048m -Xmx8192m --setting.file=$AGENT_HOME/conf/settings.json &

@ -44,6 +44,8 @@
"receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received",
"processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed",
"failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure",
"threadPoolSize": 300,
"retentionDaysOfProcessedMessage": 60,
"dropBoxList": [
{
"dropBoxId": "db-bd-cubrid-save",
@ -53,6 +55,8 @@
}
]
},
"postman": [
]
"postmanConfig": {
"threadPoolSize": 30,
"postmanList": []
}
}

@ -15,7 +15,7 @@ public class AgentConfigDto {
private List<DataSourceConfig> dataSourceConfig;
private List<String> sqlMapperLocations;
private DropBoxConfig dropBox;
private List<Postman> postman;
private PostmanConfig postmanConfig;
private LoggingConfig logging;
@ -48,6 +48,8 @@ public class AgentConfigDto {
private String processedMessageStorageRoot;
private String failureMessageStorageRoot;
private List<DropBox> dropBoxList;
private int threadPoolSize;
private int retentionDaysOfProcessedMessage;
}
@Data
@ -59,6 +61,12 @@ public class AgentConfigDto {
private String saveDirectoryRoot;
}
@Data
public static class PostmanConfig {
private int threadPoolSize;
private List<Postman> postmanList;
}
@Data
public static class Postman {
private String postmanId;

@ -72,7 +72,7 @@ public class AgentConfigReader {
public List<AgentConfigDto.Postman> getScheduledTypePostmanList() {
List<AgentConfigDto.Postman> postmanList = null;
for(AgentConfigDto.Postman postman : this.agentConfigDto.getPostman()) {
for(AgentConfigDto.Postman postman : this.agentConfigDto.getPostmanConfig().getPostmanList()) {
if(AgentConfigDto.ActionType.SCHEDULED == postman.getAction().getType()) {
if(postmanList == null) {
postmanList = new ArrayList<>();

@ -7,9 +7,6 @@ import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.rolling.RollingFileAppender;
import ch.qos.logback.core.rolling.TimeBasedRollingPolicy;
import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxTaskExecutorService;
import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
@ -24,17 +21,12 @@ import java.util.Map;
@Component
@RequiredArgsConstructor
public class BeanInitializer implements SmartInitializingSingleton {
private final DfxAgentConfiguration dfxAgentConfiguration;
private final DynamicDataSourceService dynamicDataSourceService;
private final PostmanSchedulerService postmanSchedulerService;
private final DropBoxTaskExecutorService dropBoxTaskExecutorService;
private final AgentConfigReader agentConfigReader;
@Override
public void afterSingletonsInstantiated() {
System.out.println("🔥 BeanInitializer afterSingletonsInstantiated() 진입!");
log.info("✅ BeanInitializer 작동 확인");
this.configureLogback();
log.info("✅ logger 설정 완료");
}
private void configureLogback() {

@ -179,35 +179,22 @@ public class DfxAgentConfiguration {
return dynamicDataSourceService;
}
// (수신 처리) 메시지 수신 - 저장 후 수신 메시지 처리 쓰레드 설정
@Bean(name = "dropBoxProcessorThreadPoolTaskExecutor")
public ThreadPoolTaskExecutor dropBoxProcessorThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor dropBoxProcessorThreadPoolTaskExecutor = new ThreadPoolTaskExecutor();
dropBoxProcessorThreadPoolTaskExecutor.setCorePoolSize(30); // 최소 쓰레드
dropBoxProcessorThreadPoolTaskExecutor.setMaxPoolSize(300); // 최대 쓰레드
dropBoxProcessorThreadPoolTaskExecutor.setQueueCapacity(300); // 대기 큐
dropBoxProcessorThreadPoolTaskExecutor.setThreadNamePrefix("dropBoxProcessor-");
dropBoxProcessorThreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부
dropBoxProcessorThreadPoolTaskExecutor.initialize();
return dropBoxProcessorThreadPoolTaskExecutor;
}
// (수신 처리) 메시지 수신 - 저장 후 수신 메시지 처리 쓰레드 설정
@Bean(name = "dropBoxProcessorThreadPoolTaskScheduler")
public ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler() {
public ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler(AgentConfigReader agentConfigReader) {
ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
dropBoxProcessorThreadPoolTaskScheduler.setPoolSize(300);
dropBoxProcessorThreadPoolTaskScheduler.setPoolSize(agentConfigReader.getAgentConfigDto().getDropBox().getThreadPoolSize());
dropBoxProcessorThreadPoolTaskScheduler.setThreadNamePrefix("dropBoxProcessor-");
dropBoxProcessorThreadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부
dropBoxProcessorThreadPoolTaskScheduler.initialize();
return dropBoxProcessorThreadPoolTaskScheduler;
}
// (송신 처리) 메시지 송신 쓰레드 설정(삭제 예정)
// (송신 처리) 메시지 송신 쓰레드 설정
@Bean(name = "scheduledPostmanThreadPoolTaskScheduler")
public ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler() { // 실행확인됨
public ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler(AgentConfigReader agentConfigReader) { // 실행확인됨
ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
scheduledPostmanThreadPoolTaskScheduler.setPoolSize(10);
scheduledPostmanThreadPoolTaskScheduler.setPoolSize(agentConfigReader.getAgentConfigDto().getPostmanConfig().getThreadPoolSize());
scheduledPostmanThreadPoolTaskScheduler.setThreadNamePrefix("postman-scheduler-");
scheduledPostmanThreadPoolTaskScheduler.initialize();
return scheduledPostmanThreadPoolTaskScheduler;

@ -1,7 +1,8 @@
package com.bsmlab.dfx.agent.config;
import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxTaskExecutorService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxSchedulerService;
import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService;
import com.bsmlab.dfx.agent.task.status.StatusCheckerSchedulerService;
import lombok.RequiredArgsConstructor;
@ -17,22 +18,29 @@ public class StartupRunner implements ApplicationRunner {
private final DfxAgentConfiguration dfxAgentConfiguration;
private final DynamicDataSourceService dynamicDataSourceService;
private final PostmanSchedulerService postmanSchedulerService;
private final DropBoxTaskExecutorService dropBoxTaskExecutorService;
private final DropBoxSchedulerService dropBoxSchedulerService;
private final StatusCheckerSchedulerService statusCheckerSchedulerService;
private final DropBoxService dropBoxService;
@Override
public void run(ApplicationArguments args) throws Exception {
log.debug("✅ StartupRunner start [run]");
log.info("✅ StartupRunner 기동");
// DfxAgentConfiguration 에서 생성한 빈 중 DataSource 관련 설정을 마무리한다.
dynamicDataSourceService.setSqlSessionFactoryMap(dfxAgentConfiguration.getTemporarySqlSessionFactoryMap());
dynamicDataSourceService.setTransactionManagerMap(dfxAgentConfiguration.getTemporaryTransactionManagerMap());
//4. Worker 쓰레드 생성
// 송신 메시지 전송 서비스
log.info("✅ PostmanSchedulerService 기동");
postmanSchedulerService.launch();
// 수신한 메시지 처리 서비스
dropBoxTaskExecutorService.launch();
log.info("✅ DropBoxTaskExecutorService 기동");
dropBoxSchedulerService.launch();
// 다른 agent 상태 확인 서비스
log.info("✅ StatusCheckerSchedulerService 기동");
statusCheckerSchedulerService.launch();
// receivedMessageStorageRoot 하위에 처리되지 않은 메시지를 처리 queue에 넣는다.
int messageCount = dropBoxService.addNotProcessedMessageFile();
log.info("✅ 미처리 메시지 체크 - {} 건 처리 등록", messageCount);
log.debug("✅ StartupRunner end");
}
}

@ -1,9 +1,5 @@
package com.bsmlab.dfx.agent.event;
import com.bsmlab.dfx.agent.config.DfxAgentConfiguration;
import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxTaskExecutorService;
import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
@ -14,28 +10,10 @@ import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ContextReadyListener implements ApplicationListener<ContextRefreshedEvent> {
private final DfxAgentConfiguration dfxAgentConfiguration;
private final DynamicDataSourceService dynamicDataSourceService;
private final PostmanSchedulerService postmanSchedulerService;
private final DropBoxTaskExecutorService dropBoxTaskExecutorService;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
System.out.println("🔥 ContextReadyListener onApplicationEvent() 진입!");
log.info("✅ ContextReadyListener 작동 확인");
/*
// DfxAgentConfiguration 에서 생성한 빈 중 DataSource 관련 설정을 마무리한다.
dynamicDataSourceService.setSqlSessionFactoryMap(dfxAgentConfiguration.getTemporarySqlSessionFactoryMap());
dynamicDataSourceService.setTransactionManagerMap(dfxAgentConfiguration.getTemporaryTransactionManagerMap());
// PostmanSchedulerService, DropBoxTaskExecutorService 는 @RequiredArgsConstructor 이기 때문에 자동으로 injection 된다
// 그 후 @PostConstruct 로직이 실행될 것이다.
log.debug("ContextReadyListener onApplicationEvent [run]");
log.debug("{} ready", postmanSchedulerService.getClass().getName());
log.debug("{} ready", dropBoxTaskExecutorService.getClass().getName());
//TODO 4. Worker 쓰레드 생성
*/
}

@ -71,7 +71,7 @@ public class ListenerController {
AckDto ackDto;
try {
String bodyString= ServletUtils.getBodyString(request);
ackDto = listenerService.receiveAck(bodyString);
ackDto = listenerService.receiveCommand(bodyString);
} catch (IOException e) {
ackDto = AckDto.builder().result(AckDto.ResultType.RECEIVE_FAIL).resultText(e.getLocalizedMessage()).messageUuid("").build();
}

@ -14,10 +14,7 @@ import org.apache.commons.lang3.EnumUtils;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
@ -27,7 +24,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
@ -35,8 +31,7 @@ import java.util.concurrent.ScheduledFuture;
@Service
@RequiredArgsConstructor
@Slf4j
public class DropBoxTaskExecutorService {
private final ThreadPoolTaskExecutor dropBoxProcessorThreadPoolTaskExecutor;
public class DropBoxSchedulerService {
private final ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler;
private final AgentConfigReader agentConfigReader;
private final SqlExecuteService sqlExecuteService;
@ -48,7 +43,6 @@ public class DropBoxTaskExecutorService {
* @param messageFilePath
*/
@SuppressWarnings("unchecked")
// @Async("dropBoxProcessorThreadPoolTaskExecutor")
public void processDropBox(String messageFilePath) {
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().build();
ReceiveMessageDto.ProcessStatus processStatus;
@ -176,28 +170,9 @@ public class DropBoxTaskExecutorService {
}
}
public void launch() { // 실행확인됨
log.debug("{} launch", this.getClass().getName());
ScheduledFuture<?> scheduledFuture = dropBoxProcessorThreadPoolTaskScheduler.scheduleWithFixedDelay(this::run, Duration.ofMillis(10));
/*
Runnable runnable = () -> {
try {
log.info("dropBoxProcessThread run {}", Thread.currentThread().getName());
String messageFilePath = dropBoxService.poll();
if(StringUtils.isNotBlank(messageFilePath)) {
this.processDropBox(messageFilePath);
}
Thread.sleep(10);
} catch (InterruptedException e) {
log.error("{}", e, e);
Thread.currentThread().interrupt();
}
};
for(int i = 0; i < this.dropBoxProcessorThreadPoolTaskExecutor.getCorePoolSize(); i++) {
this.dropBoxProcessorThreadPoolTaskExecutor.execute(runnable);
}
*/
}
}

@ -13,7 +13,9 @@ import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
@ -75,6 +77,37 @@ public class DropBoxService {
}
}
/**
* . queue .
*/
public int addNotProcessedMessageFile() {
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot());
List<File> fileList = new ArrayList<>();
this.findAndAddMessageFile(root, fileList);
for(File file : fileList) {
this.queue.add(file.getAbsolutePath());
}
return fileList.size();
}
private void findAndAddMessageFile(File parentDirectory, List<File> fileList) {
File[] filesOnDirectory = parentDirectory.listFiles();
if(filesOnDirectory != null) {
for(File file : filesOnDirectory) {
if(file.canRead() && !".".equals(file.getName()) && !"..".equals(file.getName())) {
if(file.isFile()) {
if(!this.queue.contains(file.getAbsolutePath())) {
fileList.add(file);
}
}
else if(file.isDirectory()) {
findAndAddMessageFile(file, fileList);
}
}
}
}
}
/**
* (queue) .
* @return ReceiveMessageDto serialize

Loading…
Cancel
Save