diff --git a/src/docs/datasource작업중.md b/src/docs/datasource작업중.md deleted file mode 100644 index 1e52b4a..0000000 --- a/src/docs/datasource작업중.md +++ /dev/null @@ -1,216 +0,0 @@ -### ✅ **Spring Boot에서 동적으로 N개의 데이터소스를 추가하고, 특정 이름에 따른 Connection Pool로 관리하기** -> 🚀 **요구사항 업데이트:** -> 1. **각 데이터소스는 특정한 이름(`dataSourceName`)으로 관리**. -> 2. **각 데이터소스마다 개별적인 Connection Pool을 사용**. -> 3. **동적으로 데이터소스를 추가할 수 있어야 함**. -> 4. **각 데이터소스는 `TransactionManager`에 의해 관리됨**. -> 5. **MyBatis와 연동**. - ---- - -## **🔹 1. 데이터소스를 Connection Pool로 관리하는 `DynamicRoutingDataSource` 구현** -각 데이터소스는 **Connection Pool(DBCP2)를 사용하여 동적으로 추가**하며, -각 요청 시 **ThreadLocal을 이용하여 특정 데이터소스를 선택**할 수 있도록 `AbstractRoutingDataSource`를 활용합니다. - -📄 **`DynamicRoutingDataSource.java`** -```java -import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; -import javax.sql.DataSource; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class DynamicRoutingDataSource extends AbstractRoutingDataSource { - - private static final ThreadLocal contextHolder = new ThreadLocal<>(); - private final Map dataSourceMap = new ConcurrentHashMap<>(); - - @Override - protected Object determineCurrentLookupKey() { - return contextHolder.get(); - } - - public void setDataSource(String dataSourceName) { - contextHolder.set(dataSourceName); - } - - public void clearDataSource() { - contextHolder.remove(); - } - - public void addDataSource(String name, DataSource dataSource) { - dataSourceMap.put(name, dataSource); - super.setTargetDataSources(dataSourceMap); - super.afterPropertiesSet(); - } - - public Map getDataSourceMap() { - return dataSourceMap; - } -} -``` -✅ **기능:** -- `setDataSource("dataSourceName")`을 호출하면 **해당 데이터소스를 선택**하여 사용. -- `addDataSource("dataSourceName", DataSource)`을 통해 **새로운 데이터소스를 동적으로 추가 가능**. -- `ConcurrentHashMap`을 사용하여 **Connection Pool을 효율적으로 관리**. - ---- - -## **🔹 2. 데이터소스 설정 및 Connection Pool 관리** -각 데이터소스는 **DBCP2 커넥션 풀**을 사용하여 생성됩니다. - -📄 **`DynamicDataSourceConfig.java`** -```java -import org.apache.ibatis.session.SqlSessionFactory; -import org.apache.ibatis.session.SqlSessionTemplate; -import org.apache.ibatis.datasource.pooled.PooledDataSource; -import org.mybatis.spring.SqlSessionFactoryBean; -import org.mybatis.spring.annotation.MapperScan; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; - -import javax.sql.DataSource; -import java.util.HashMap; -import java.util.Map; - -@Configuration -@MapperScan(basePackages = "com.example.mapper", sqlSessionTemplateRef = "dynamicSqlSessionTemplate") -public class DynamicDataSourceConfig { - - @Bean - public DynamicRoutingDataSource dynamicRoutingDataSource() { - DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource(); - - // 기본 데이터소스 등록 - DataSource defaultDataSource = createDataSource("jdbc:mysql://localhost:3306/default_db", "root", "password"); - dynamicRoutingDataSource.addDataSource("default", defaultDataSource); - - return dynamicRoutingDataSource; - } - - @Bean - public DataSourceTransactionManager transactionManager(DynamicRoutingDataSource dynamicRoutingDataSource) { - return new DataSourceTransactionManager(dynamicRoutingDataSource); - } - - @Bean - public SqlSessionFactory sqlSessionFactory(DynamicRoutingDataSource dynamicRoutingDataSource) throws Exception { - SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); - sessionFactory.setDataSource(dynamicRoutingDataSource); - return sessionFactory.getObject(); - } - - @Bean - public SqlSessionTemplate dynamicSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { - return new SqlSessionTemplate(sqlSessionFactory); - } - - private DataSource createDataSource(String url, String username, String password) { - // DBCP2 Connection Pool 사용 - PooledDataSource dataSource = new PooledDataSource(); - dataSource.setDriver("com.mysql.cj.jdbc.Driver"); - dataSource.setUrl(url); - dataSource.setUsername(username); - dataSource.setPassword(password); - return dataSource; - } -} -``` -✅ **기능:** -- `createDataSource()`를 사용하여 **각각의 데이터소스를 DBCP2 기반 Connection Pool로 생성**. -- `dynamicRoutingDataSource.addDataSource("default", defaultDataSource);`를 호출하여 기본 데이터소스를 등록. - ---- - -## **🔹 3. 새로운 데이터소스를 동적으로 추가하는 서비스** -📄 **`DynamicDataSourceService.java`** -```java -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import javax.sql.DataSource; - -@Service -public class DynamicDataSourceService { - - @Autowired - private DynamicRoutingDataSource dynamicRoutingDataSource; - - public void addNewDataSource(String dataSourceName, String url, String username, String password) { - DataSource newDataSource = createDataSource(url, username, password); - dynamicRoutingDataSource.addDataSource(dataSourceName, newDataSource); - System.out.println("✅ 새로운 데이터소스 추가됨: " + dataSourceName); - } - - private DataSource createDataSource(String url, String username, String password) { - PooledDataSource dataSource = new PooledDataSource(); - dataSource.setDriver("com.mysql.cj.jdbc.Driver"); - dataSource.setUrl(url); - dataSource.setUsername(username); - dataSource.setPassword(password); - return dataSource; - } -} -``` -✅ **기능:** -- `addNewDataSource("newDB", "jdbc:mysql://localhost:3306/new_db", "user", "pass")`을 호출하면 **새로운 데이터소스를 동적으로 추가**. -- `dynamicRoutingDataSource.addDataSource(dataSourceName, newDataSource)`을 사용하여 **데이터소스를 관리하는 Connection Pool에 추가**. - ---- - -## **🔹 4. 특정 데이터소스를 선택하여 트랜잭션 처리** -📄 **`UserService.java`** -```java -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -@Service -public class UserService { - - @Autowired - private UserMapper userMapper; - - public void fetchUsersFromDefault() { - DynamicRoutingDataSource.setDataSource("default"); - System.out.println("🔹 Default DB에서 데이터 조회"); - System.out.println(userMapper.findAll()); - } - - @Transactional - public void fetchUsersFromNewDB() { - DynamicRoutingDataSource.setDataSource("newDB"); - System.out.println("🔹 New DB에서 데이터 조회"); - System.out.println(userMapper.findAll()); - } -} -``` -📄 **`UserMapper.java`** -```java -import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Select; -import java.util.List; -import java.util.Map; - -@Mapper -public interface UserMapper { - @Select("SELECT * FROM users") - List> findAll(); -} -``` -✅ **기능:** -- `DynamicRoutingDataSource.setDataSource("newDB")`를 호출하면 **특정 데이터소스를 선택**. -- MyBatis에서 **선택된 데이터소스를 기반으로 SQL 실행**. -- `@Transactional`을 사용하여 **선택된 데이터소스에서 트랜잭션 처리** 가능. - ---- - -## ✅ **최종 정리** -| 기능 | 코드 | -|------|------| -| **동적으로 데이터소스 변경** | `DynamicRoutingDataSource.setDataSource("dataSourceName")` | -| **새로운 데이터소스 추가** | `dynamicDataSourceService.addNewDataSource("newDB", "url", "user", "pass")` | -| **Connection Pool 관리** | `PooledDataSource` (MyBatis DBCP2 사용) | -| **트랜잭션 관리** | `@Transactional` 사용 | -| **MyBatis 연동** | `SqlSessionFactory` + `SqlSessionTemplate` 사용 | - -✅ **이제 애플리케이션 실행 중에도 N개의 데이터소스를 추가하고, Connection Pool로 관리하며, MyBatis와 연동하여 사용할 수 있습니다!** 🚀 \ No newline at end of file diff --git a/src/docs/settings-examples/dfxagent.json b/src/docs/settings-examples/dfxagent.json index f5566df..cf3c2cb 100644 --- a/src/docs/settings-examples/dfxagent.json +++ b/src/docs/settings-examples/dfxagent.json @@ -15,16 +15,16 @@ "drop-box-id-list": ["drop3", "drop4"] } ], - "datasource-config": [ + "data-source-config": [ { - "dataSourceId": "dfcms", + "data-source-id": "dfcms", "driverClassName": "org.postgresql.Driver", "url": "jdbc:postgresql://bsm-lab.com:5432/defree?currentSchema=DFCMS", "username": "defreeadmin", "password": "qortpals1!" }, { - "dataSourceId": "mochastory", + "data-source-id": "mochastory", "driverClassName": "com.mysql.jdbc.Driver", "url": "jdbc:mysql://bsm-lab.com:3306/MOCHASTORY?allowPublicKeyRetrieval=true", "username": "MOCHASTORY", @@ -39,8 +39,8 @@ "drop-box-list": [ { "drop-box-id": "save-violation-history", - "task-type": "SAVE_DATA_TABLE", - "dataSourceId": "dfcms", + "task-type": "RECEIVE_DB_TO_DB_SAVE", + "data-source-id": "dfcms", "sql-id": "dfcms.violation.insertViolationHistory" }, { @@ -50,8 +50,8 @@ }, { "drop-box-id": "save-work-image-file-info", - "task-type": "SAVE_DATA_TABLE", - "dataSourceId": "dfcms", + "task-type": "RECEIVE_DB_TO_DB_SAVE", + "data-source-id": "dfcms", "sql-id": "dfcms.file.insertFileInfo" } ] @@ -66,8 +66,8 @@ "parameters-key-list": ["REG_DATE_FROM", "REG_DATE_TO"] }, "message": { - "message-type": "SAVE_DB_DATA", - "dataSourceId": "dfcms", + "message-type": "TRANSFER_DB_TO_DB", + "data-source-id": "dfcms", "sql-id": "dfcms.selectSome" }, "recipient-host-id": "third-agent", @@ -79,16 +79,34 @@ "task-type": "DB_READ_THEN_SEND", "action": { "type": "SCHEDULED", - "cron": "1 0 3 * * *" + "cron": "0 0 3 * * *" }, "message": { - "message-type": "SAVE_DB_DATA", - "dataSourceId": "dfcms", + "message-type": "TRANSFER_DB_TO_DB", + "data-source-id": "dfcms", "sql-id": "dfcms.selectSome" }, "recipient-host-id": "third-agent", "recipient-drop-box-id": "drop3", "routing-host-id-list": ["first-agent", "second-agent", "third-agent"] + }, + { + "postman-id": "postman3", + "task-type": "FILE_READ_THEN_SEND", + "action": { + "type": "SCHEDULED", + "cron": "0 1 3 * * *" + }, + "message": { + "message-type": "TRANSFER_FILE", + "watch-directory": "D:\\projects\\bsm-lab\\dfx\\run\\send_file", + "meta-data-data-source-id": "save-work-image-file-info", + "meta-data-sql-id": "dfcms.file.selectFileInfo", + "meta-drop-box-id": "save-work-image-file-info" + }, + "recipient-host-id": "third-agent", + "recipient-drop-box-id": "drop4", + "routing-host-id-list": ["first-agent", "second-agent", "third-agent"] } ] } diff --git a/src/docs/settings-examples/dfxagent.yml b/src/docs/settings-examples/dfxagent.yml deleted file mode 100644 index ba716e1..0000000 --- a/src/docs/settings-examples/dfxagent.yml +++ /dev/null @@ -1,12 +0,0 @@ -datasource: - dfcms: - driverClassName: org.postgresql.Driver - url: jdbc:postgresql://bsm-lab.com:5432/defree?currentSchema=DFCMS - username: defreeadmin - password: qortpals1! - mochastory: - driverClassName: com.mysql.jdbc.Driver - url: jdbc:mysql://bsm-lab.com:3306/MOCHASTORY?allowPublicKeyRetrieval=true - username: MOCHASTORY - password: MOCHASTORY - diff --git a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java index 173c5a8..30608e4 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java @@ -73,6 +73,10 @@ public class AgentConfigDto { private MessageType messageType; private String dataSourceId; private String sqlId; + private String watchDirectory; + private String metaDataDataSourceId; + private String metaDataSqlId; + private String metaDropBoxId; } public static enum ActionType { @@ -81,13 +85,14 @@ public class AgentConfigDto { } public static enum MessageType { - SAVE_DB_DATA, - SAVE_FILE; + TRANSFER_DB_TO_DB, + TRANSFER_FILE; } public static enum TaskType { DB_READ_THEN_SEND, - SAVE_DB_TABLE, + FILE_READ_THEN_SEND, + RECEIVE_DB_TO_DB_SAVE, RECEIVE_FILE } diff --git a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java index 8b8047a..af106c0 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java @@ -59,6 +59,7 @@ public class DfxAgentConfiguration { private Map temporarySqlSessionFactoryMap = new HashMap<>(); Map temporaryTransactionManagerMap = new HashMap<>(); + // agent 설정 관리자. 대부분의 기능에 필요함 @Bean(name = "agentConfigReader") public AgentConfigReader agentConfigReader() { if(StringUtils.isBlank(this.embeddedDbFileDirectory)) { @@ -76,6 +77,7 @@ public class DfxAgentConfiguration { return agentConfigReader; } + // 다중 데이터 소스 생성 @Bean(name = "dynamicRoutingDataSource") public DynamicRoutingDataSource dynamicRoutingDataSource(AgentConfigReader agentConfigReader) { DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource(); @@ -135,6 +137,7 @@ public class DfxAgentConfiguration { return dataSource; } + // 다중 데이터 소스와 그에 해당하는 sqlSession, transactionManager 설정 @Bean(name = "dynamicDataSourceService") public DynamicDataSourceService dynamicDataSourceService() { DynamicDataSourceService dynamicDataSourceService = new DynamicDataSourceService(); @@ -143,6 +146,7 @@ public class DfxAgentConfiguration { return dynamicDataSourceService; } + // 연계 처리 결과 관리할 내부 DB 사용여부 결정 못함 public void copyEmbeddedDbFileIfNotExists() { String embeddedDbFileDirectory = System.getProperty("embedded.db.file.directory"); File targetDirectory = new File(embeddedDbFileDirectory); @@ -164,12 +168,14 @@ public class DfxAgentConfiguration { } } + // 연계 처리 결과 관리할 내부 DB 사용여부 결정 못함 @Bean(name = "innerDataSource") @ConfigurationProperties("spring.datasource") // application.yml 에서 설정된 값 자동 적용 public DataSource innerDataSource(DataSourceProperties dataSourceProperties) { return dataSourceProperties.initializeDataSourceBuilder().build(); } + // 연계 처리 결과 관리할 내부 DB 사용여부 결정 못함 @Bean(name = "innerSqlSessionFactory") public SqlSessionFactory innerSqlSessionFactory(@Qualifier("innerDataSource") DataSource dataSource) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); @@ -180,104 +186,38 @@ public class DfxAgentConfiguration { return sqlSessionFactoryBean.getObject(); } + // 연계 처리 결과 관리할 내부 DB 사용여부 결정 못함 @Bean(name = "innerSqlSessionTemplate") public SqlSessionTemplate innerSqlSessionTemplate(@Qualifier("innerSqlSessionFactory") SqlSessionFactory innerSqlSessionFactory) { return new SqlSessionTemplate(innerSqlSessionFactory); } + // 연계 처리 결과 관리할 내부 DB 사용여부 결정 못함 @Bean(name = "innerTransactionManager") public DataSourceTransactionManager innerTransactionManager(@Qualifier("innerDataSource") DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } + // (수신 처리) 메시지 수신 - 저장 후 수신 메시지 처리 쓰레드 설정 @Bean(name = "dropBoxProcessorThreadPoolTaskExecutor") public Executor dropBoxProcessorThreadPoolTaskExecutor() { ThreadPoolTaskExecutor dropBoxProcessorThreadPoolTaskExecutor = new ThreadPoolTaskExecutor(); dropBoxProcessorThreadPoolTaskExecutor.setCorePoolSize(30); // 최소 쓰레드 dropBoxProcessorThreadPoolTaskExecutor.setMaxPoolSize(300); // 최대 쓰레드 dropBoxProcessorThreadPoolTaskExecutor.setQueueCapacity(300); // 대기 큐 - dropBoxProcessorThreadPoolTaskExecutor.setThreadNamePrefix("dfxExecutor-"); + dropBoxProcessorThreadPoolTaskExecutor.setThreadNamePrefix("dropBoxProcessor-"); dropBoxProcessorThreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부 dropBoxProcessorThreadPoolTaskExecutor.initialize(); return dropBoxProcessorThreadPoolTaskExecutor; } + // (송신 처리) 메시지 송신 쓰레드 설정 @Bean(name = "scheduledPostmanThreadPoolTaskScheduler") public ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler() { ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); scheduledPostmanThreadPoolTaskScheduler.setPoolSize(10); - scheduledPostmanThreadPoolTaskScheduler.setThreadNamePrefix("dynamic-scheduler-"); + scheduledPostmanThreadPoolTaskScheduler.setThreadNamePrefix("postman-scheduler-"); scheduledPostmanThreadPoolTaskScheduler.initialize(); return scheduledPostmanThreadPoolTaskScheduler; - /** - import org.springframework.scheduling.Trigger; - import org.springframework.scheduling.TriggerContext; - import org.springframework.scheduling.support.CronTrigger; - import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; - import org.springframework.stereotype.Service; - - import java.util.Date; - import java.util.concurrent.ScheduledFuture; - - @Service - public class DynamicSchedulerService { - - private final ThreadPoolTaskScheduler taskScheduler; - private ScheduledFuture scheduledFuture; - private String currentCron = "0/10 * * * * *"; // 10초마다 - - public DynamicSchedulerService(ThreadPoolTaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; - } - - public void startDynamicTask() { - scheduledFuture = taskScheduler.schedule(this::runTask, new CronTrigger(currentCron)); - } - - public void updateCron(String newCron) { - stopTask(); - this.currentCron = newCron; - startDynamicTask(); - } - - public void stopTask() { - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - } - } - - private void runTask() { - System.out.println("작업 실행: " + new Date()); - // 실제 작업 로직 - } - } - - @RestController - @RequiredArgsConstructor - public class ScheduleController { - - private final DynamicSchedulerService schedulerService; - - @PostMapping("/start") - public String start() { - schedulerService.startDynamicTask(); - return "스케줄 시작"; - } - - @PostMapping("/update") - public String update(@RequestParam String cron) { - schedulerService.updateCron(cron); - return "크론 변경됨: " + cron; - } - - @PostMapping("/stop") - public String stop() { - schedulerService.stopTask(); - return "스케줄 정지됨"; - } - } - - - */ } } diff --git a/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java b/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java index a753a36..0f913c0 100644 --- a/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java +++ b/src/main/java/com/bsmlab/dfx/agent/listener/service/ListenerService.java @@ -8,7 +8,6 @@ import com.bsmlab.dfx.agent.support.MessageUtils; import com.bsmlab.dfx.agent.support.exception.IllegalMessageException; import com.bsmlab.dfx.agent.support.exception.InCompleteMessageException; import com.bsmlab.dfx.agent.support.exception.NullMessageException; -import com.bsmlab.dfx.agent.task.dropbox.DropBoxDto; import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; import jakarta.servlet.http.Part; import lombok.RequiredArgsConstructor; diff --git a/src/main/java/com/bsmlab/dfx/agent/task/PostmanSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/PostmanSchedulerService.java deleted file mode 100644 index df127e5..0000000 --- a/src/main/java/com/bsmlab/dfx/agent/task/PostmanSchedulerService.java +++ /dev/null @@ -1,115 +0,0 @@ -package com.bsmlab.dfx.agent.task; - -import com.bsmlab.dfx.agent.config.AgentConfigDto; -import com.bsmlab.dfx.agent.config.AgentConfigReader; -import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; -import com.bsmlab.dfx.agent.listener.dto.AckDto; -import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; -import com.bsmlab.dfx.agent.support.exception.DfxException; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.annotation.PostConstruct; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import org.springframework.scheduling.support.CronTrigger; -import org.springframework.stereotype.Service; -import org.springframework.web.client.RestTemplate; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ScheduledFuture; - -@Service -@Slf4j -@RequiredArgsConstructor -public class PostmanSchedulerService { - private final ThreadPoolTaskScheduler threadPoolTaskScheduler; - private final AgentConfigReader agentConfigReader; - private final SqlExecuteService sqlExecuteService; - private Map> scheduledFutureMap = new HashMap<>(); - private Map postmanMap = new HashMap<>(); - - public void startPostman(String postmanId) { - AgentConfigDto.Postman postman = this.postmanMap.get(postmanId); - this.stop(postmanId); - String cron = postman.getAction().getCron(); - ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(() -> run(postman), new CronTrigger(cron)); - scheduledFutureMap.put(postmanId, scheduledFuture); - } - - private void stop(String postmanId) { - if(this.scheduledFutureMap.get(postmanId) != null) { - ScheduledFuture scheduledFuture = this.scheduledFutureMap.get(postmanId); - scheduledFuture.cancel(false); - this.scheduledFutureMap.remove(postmanId); - } - } - - @PostConstruct - public void launch() { - List postmanList = agentConfigReader.getScheduledTypePostmanList(); - for(AgentConfigDto.Postman postman : postmanList) { - this.startPostman(postman.getPostmanId()); - } - /* - { - "postman-id": "postman2", - "task-type": "DB_READ_THEN_SEND", - "action": { - "type": "SCHEDULED", - "cron": "1 0 3 * * *" - }, - "message": { - "message-type": "SAVE_DB_DATA", - "dataSourceId": "dfcms", - "sql-id": "dfcms.selectSome" - }, - "recipient-host-id": "third-agent", - "recipient-drop-box-id": "drop3", - "routing-host-id-list": ["first-agent", "second-agent", "third-agent"] - } - */ - } - - public void run(AgentConfigDto.Postman postman) { - ObjectMapper objectMapper = new ObjectMapper(); - String senderHostId = agentConfigReader.getAgentConfigDto().getHostId(); - long senderTimestamp = System.currentTimeMillis(); - String messageUuid = UUID.randomUUID().toString(); - // DB TO DB 전송 - if(AgentConfigDto.MessageType.SAVE_DB_DATA == postman.getMessage().getMessageType()) { - String dataSourceId = postman.getMessage().getDataSourceId(); - String sqlId = postman.getMessage().getSqlId(); - try { - List> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null); - String dataString = objectMapper.writeValueAsString(dataMapList); - ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) - .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.SAVE_DB_DATA) - .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) - .data(dataString).build(); - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - HttpEntity bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); - RestTemplate restTemplate = new RestTemplate(); - AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); - String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; - String response = restTemplate.postForObject(url, bodyEntity, String.class); - AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); - if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { - throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); - } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } catch (DfxException e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java b/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java deleted file mode 100644 index 423cc9d..0000000 --- a/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorStarter.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.bsmlab.dfx.agent.task; - -import com.bsmlab.dfx.agent.config.AgentConfigDto; -import com.bsmlab.dfx.agent.config.AgentConfigReader; -import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; -import io.micrometer.common.util.StringUtils; -import jakarta.annotation.PostConstruct; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.List; - -@Component -@RequiredArgsConstructor -@Slf4j -public class TaskExecutorStarter { - private final AgentConfigReader agentConfigReader; - private final DropBoxService dropBoxService; - private final TaskExecutorService taskExecutorService; - - @PostConstruct - public void run() { - List scheduledPostmanList = agentConfigReader.getScheduledTypePostmanList(); - for(AgentConfigDto.Postman postman : scheduledPostmanList) { - taskExecutorService.processPostman(postman); - } - - while(true) { - try { - Thread.sleep(10); - String messageFilePath = dropBoxService.poll(); - if(StringUtils.isNotBlank(messageFilePath)) { - taskExecutorService.processDropBox(messageFilePath); - } - } catch (InterruptedException e) { - log.error("{}", e, e); - Thread.currentThread().interrupt(); - } - } - } -} diff --git a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxDto.java b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxDto.java deleted file mode 100644 index ace18a8..0000000 --- a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxDto.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.bsmlab.dfx.agent.task.dropbox; - -import com.bsmlab.dfx.agent.config.AgentConfigDto; -import lombok.Builder; -import lombok.Data; - -@Data -@Builder -public class DropBoxDto { - private String dropBoxId; - private AgentConfigDto.TaskType taskType; - private String dataSourceId; - private String sqlId; - private String saveDirectoryRoot; -} diff --git a/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxTaskExecutorService.java similarity index 93% rename from src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java rename to src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxTaskExecutorService.java index a0ea216..8696513 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/TaskExecutorService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxTaskExecutorService.java @@ -1,14 +1,14 @@ -package com.bsmlab.dfx.agent.task; +package com.bsmlab.dfx.agent.task.dropbox; import com.bsmlab.dfx.agent.config.AgentConfigDto; import com.bsmlab.dfx.agent.config.AgentConfigReader; import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; import com.bsmlab.dfx.agent.listener.dto.AckDto; import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; -import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.common.util.StringUtils; +import jakarta.annotation.PostConstruct; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -30,7 +30,7 @@ import java.util.Map; @Service @RequiredArgsConstructor @Slf4j -public class TaskExecutorService { +public class DropBoxTaskExecutorService { private final AgentConfigReader agentConfigReader; private final SqlExecuteService sqlExecuteService; private final DropBoxService dropBoxService; @@ -54,7 +54,7 @@ public class TaskExecutorService { AgentConfigDto.DropBox dropBox = agentConfigReader.getDropBox(receiveMessageDto.getRecipientDropBoxId()); log.info("process messageUuid:{} dropBoxId: {}", receiveMessageDto.getMessageUuid(), dropBox.getDropBoxId()); //1. DB 저장 메시지 처리 - if(dropBox.getTaskType() == AgentConfigDto.TaskType.SAVE_DB_TABLE) { + if(dropBox.getTaskType() == AgentConfigDto.TaskType.RECEIVE_DB_TO_DB_SAVE) { ObjectMapper objectMapper = new ObjectMapper(); List> dataMapList = null; dataMapList = (List>) objectMapper.readValue(receiveMessageDto.getData(), List.class); @@ -129,12 +129,6 @@ public class TaskExecutorService { } } - //@Async("threadPoolTaskExecutor") - public void processPostman(AgentConfigDto.Postman postman) { - - - } - private void ackDropBoxProcessResult(ReceiveMessageDto receiveMessageDto, String processMessage) { AckDto.ResultType resultType = EnumUtils.getEnum(AckDto.ResultType.class, receiveMessageDto.getProcessStatus().toString()); AckDto ackDto = AckDto.builder().result(resultType).messageUuid(receiveMessageDto.getMessageUuid()).resultText(processMessage).build(); @@ -156,4 +150,21 @@ public class TaskExecutorService { log.error("{}", e, e); } } + + @PostConstruct + public void run() { + while(true) { + try { + Thread.sleep(10); + String messageFilePath = dropBoxService.poll(); + if(StringUtils.isNotBlank(messageFilePath)) { + this.processDropBox(messageFilePath); + } + } catch (InterruptedException e) { + log.error("{}", e, e); + Thread.currentThread().interrupt(); + } + } + } + } diff --git a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanDto.java b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanDto.java deleted file mode 100644 index c3d265f..0000000 --- a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanDto.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.bsmlab.dfx.agent.task.postman; - -import com.bsmlab.dfx.agent.config.AgentConfigDto; -import lombok.Builder; -import lombok.Data; - -import java.util.ArrayList; -import java.util.List; - -@Data -@Builder -public class PostmanDto { - private String postmanId; - private AgentConfigDto.TaskType taskType; - private PostmanActionType postmanActionType; - private PostmanMessageType postmanMessageType; - private String recipientHostId; - private String recipientDropBoxId; - private List routingHostIdList; - - @Data - public static class PostmanActionType { - private AgentConfigDto.ActionType actionType; - private String command; - private List parameterKeyList = new ArrayList<>(); - private String cron; - } - - @Data - public static class PostmanMessageType { - private AgentConfigDto.MessageType messageType; - private String dataSourceId; - private String sqlId; - } - -} diff --git a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java new file mode 100644 index 0000000..851f80e --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java @@ -0,0 +1,200 @@ +package com.bsmlab.dfx.agent.task.postman; + +import com.bsmlab.dfx.agent.config.AgentConfigDto; +import com.bsmlab.dfx.agent.config.AgentConfigReader; +import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; +import com.bsmlab.dfx.agent.listener.dto.AckDto; +import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; +import com.bsmlab.dfx.agent.support.exception.DfxException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Service; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestTemplate; + +import java.io.File; +import java.io.IOException; +import java.nio.file.CopyOption; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.ScheduledFuture; + +@Service +@Slf4j +@RequiredArgsConstructor +public class PostmanSchedulerService { + private final ThreadPoolTaskScheduler threadPoolTaskScheduler; + private final AgentConfigReader agentConfigReader; + private final SqlExecuteService sqlExecuteService; + private Map> scheduledFutureMap = new HashMap<>(); + private Map postmanMap = new HashMap<>(); + + private void startPostman(String postmanId) { + AgentConfigDto.Postman postman = this.postmanMap.get(postmanId); + this.stop(postmanId); + String cron = postman.getAction().getCron(); + ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(() -> run(postman), new CronTrigger(cron)); + scheduledFutureMap.put(postmanId, scheduledFuture); + } + + private void stop(String postmanId) { + if(this.scheduledFutureMap.get(postmanId) != null) { + ScheduledFuture scheduledFuture = this.scheduledFutureMap.get(postmanId); + scheduledFuture.cancel(false); + this.scheduledFutureMap.remove(postmanId); + } + } + + @PostConstruct + public void launch() { + List postmanList = agentConfigReader.getScheduledTypePostmanList(); + for(AgentConfigDto.Postman postman : postmanList) { + this.startPostman(postman.getPostmanId()); + } + } + + public void run(AgentConfigDto.Postman postman) { + ObjectMapper objectMapper = new ObjectMapper(); + String senderHostId = agentConfigReader.getAgentConfigDto().getHostId(); + long senderTimestamp = System.currentTimeMillis(); + String messageUuid = UUID.randomUUID().toString(); + // DB TO DB 전송 + if(AgentConfigDto.TaskType.DB_READ_THEN_SEND == postman.getTaskType()) { + String dataSourceId = postman.getMessage().getDataSourceId(); + String sqlId = postman.getMessage().getSqlId(); + try { + List> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null); + String dataString = objectMapper.writeValueAsString(dataMapList); + ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) + .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB) + .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) + .data(dataString).build(); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + HttpEntity bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); + RestTemplate restTemplate = new RestTemplate(); + AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); + String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; + String response = restTemplate.postForObject(url, bodyEntity, String.class); + AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); + if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { + throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } catch (DfxException e) { + throw new RuntimeException(e); + } + } + else if(AgentConfigDto.TaskType.FILE_READ_THEN_SEND == postman.getTaskType()) { + File rootDirectoryfile = new File(postman.getMessage().getWatchDirectory()); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/hh/mm/ss"); + Date today = new Date(System.currentTimeMillis()); + String dateString = simpleDateFormat.format(today); + List fileReadyList = new ArrayList<>(); + List workingFileList = new ArrayList<>(); + if(rootDirectoryfile.isDirectory() && rootDirectoryfile.listFiles().length > 0) { + // 대상 파일 찾기 -> 작업 디렉토리(working)로 이동 -> 작업 완료 후 완료 디렉토리(done)로 이동 + File[] files = rootDirectoryfile.listFiles(); + for(File file : files) { + if(file.isFile()) { + fileReadyList.add(file); + } + } + List> data = new ArrayList<>(); + String dataString = null; + String workingDirectory = rootDirectoryfile.getAbsolutePath() + "/working/" + dateString; + File workingDirectoryFile = new File(workingDirectory); + if(!workingDirectoryFile.exists()) { + workingDirectoryFile.mkdirs(); + } + try { + // 작업 디렉토리(working)로 이동 + for(File file : fileReadyList) { + Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + workingFileList.add(new File(workingDirectoryFile.getAbsoluteFile() + "/" + file.getName())); + } + // ReceiveMessageDto - messageJson 만들기 + for(File file : workingFileList) { + Map map = new HashMap<>(); + Map paramMap = new HashMap<>(); + map.put("file-name", file.getName()); + if(postman.getMessage() != null && StringUtils.isNotEmpty(postman.getMessage().getMetaDropBoxId()) + && StringUtils.isNotEmpty(postman.getMessage().getMetaDataSqlId()) && StringUtils.isNotEmpty(postman.getMessage().getMetaDataDataSourceId())) { + map.put("meta-drop-box-id", postman.getMessage().getMetaDropBoxId()); + paramMap.put("fileName", file.getName()); + List> messageDataMapList = sqlExecuteService.select(postman.getMessage().getDataSourceId(), postman.getMessage().getMetaDataSqlId(), paramMap); + if(messageDataMapList != null && messageDataMapList.get(0) != null) { + map.put("meta-data", messageDataMapList.get(0)); + } + } + data.add(map); + } + dataString = objectMapper.writeValueAsString(data); + ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) + .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB) + .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) + .data(dataString).build(); + String messageString = objectMapper.writeValueAsString(receiveMessageDto); + // http 준비 + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA); + MultiValueMap body = new LinkedMultiValueMap<>(); + // 첫 번째 멀티파트는 message json + body.add("json", new HttpEntity<>(messageString, httpHeaders)); + // 두 번째 이후 멀티파트는 파일 + for(File file : workingFileList) { + body.add(file.getName(), new FileSystemResource(file)); + } + // 전송 + RestTemplate restTemplate = new RestTemplate(); + AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); + String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; + String response = restTemplate.postForObject(url, body, String.class); + AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); + if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { + throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); + } + else { + // 작업 완료(done) 디렉토리로 이동 + String doneDirectory = rootDirectoryfile + "/done/" + dateString; + File doneDirectoryFile = new File(doneDirectory); + if(!doneDirectoryFile.exists()) { + doneDirectoryFile.mkdirs(); + } + for(File file : workingFileList) { + Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } catch (DfxException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +// Path startPath = Paths.get("C:/example"); // 시작 디렉토리 경로 +// +// try (Stream stream = Files.walk(startPath)) { +// stream.filter(Files::isRegularFile) +// .forEach(System.out::println); +// } + } + } +}