parent
76788090af
commit
b9547d6b21
@ -1,216 +0,0 @@
|
||||
### ✅ **Spring Boot에서 동적으로 N개의 데이터소스를 추가하고, 특정 이름에 따른 Connection Pool로 관리하기**
|
||||
> 🚀 **요구사항 업데이트:**
|
||||
> 1. **각 데이터소스는 특정한 이름(`dataSourceName`)으로 관리**.
|
||||
> 2. **각 데이터소스마다 개별적인 Connection Pool을 사용**.
|
||||
> 3. **동적으로 데이터소스를 추가할 수 있어야 함**.
|
||||
> 4. **각 데이터소스는 `TransactionManager`에 의해 관리됨**.
|
||||
> 5. **MyBatis와 연동**.
|
||||
|
||||
---
|
||||
|
||||
## **🔹 1. 데이터소스를 Connection Pool로 관리하는 `DynamicRoutingDataSource` 구현**
|
||||
각 데이터소스는 **Connection Pool(DBCP2)를 사용하여 동적으로 추가**하며,
|
||||
각 요청 시 **ThreadLocal을 이용하여 특정 데이터소스를 선택**할 수 있도록 `AbstractRoutingDataSource`를 활용합니다.
|
||||
|
||||
📄 **`DynamicRoutingDataSource.java`**
|
||||
```java
|
||||
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
|
||||
import javax.sql.DataSource;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
|
||||
|
||||
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
|
||||
private final Map<Object, Object> dataSourceMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected Object determineCurrentLookupKey() {
|
||||
return contextHolder.get();
|
||||
}
|
||||
|
||||
public void setDataSource(String dataSourceName) {
|
||||
contextHolder.set(dataSourceName);
|
||||
}
|
||||
|
||||
public void clearDataSource() {
|
||||
contextHolder.remove();
|
||||
}
|
||||
|
||||
public void addDataSource(String name, DataSource dataSource) {
|
||||
dataSourceMap.put(name, dataSource);
|
||||
super.setTargetDataSources(dataSourceMap);
|
||||
super.afterPropertiesSet();
|
||||
}
|
||||
|
||||
public Map<Object, Object> getDataSourceMap() {
|
||||
return dataSourceMap;
|
||||
}
|
||||
}
|
||||
```
|
||||
✅ **기능:**
|
||||
- `setDataSource("dataSourceName")`을 호출하면 **해당 데이터소스를 선택**하여 사용.
|
||||
- `addDataSource("dataSourceName", DataSource)`을 통해 **새로운 데이터소스를 동적으로 추가 가능**.
|
||||
- `ConcurrentHashMap`을 사용하여 **Connection Pool을 효율적으로 관리**.
|
||||
|
||||
---
|
||||
|
||||
## **🔹 2. 데이터소스 설정 및 Connection Pool 관리**
|
||||
각 데이터소스는 **DBCP2 커넥션 풀**을 사용하여 생성됩니다.
|
||||
|
||||
📄 **`DynamicDataSourceConfig.java`**
|
||||
```java
|
||||
import org.apache.ibatis.session.SqlSessionFactory;
|
||||
import org.apache.ibatis.session.SqlSessionTemplate;
|
||||
import org.apache.ibatis.datasource.pooled.PooledDataSource;
|
||||
import org.mybatis.spring.SqlSessionFactoryBean;
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
@MapperScan(basePackages = "com.example.mapper", sqlSessionTemplateRef = "dynamicSqlSessionTemplate")
|
||||
public class DynamicDataSourceConfig {
|
||||
|
||||
@Bean
|
||||
public DynamicRoutingDataSource dynamicRoutingDataSource() {
|
||||
DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource();
|
||||
|
||||
// 기본 데이터소스 등록
|
||||
DataSource defaultDataSource = createDataSource("jdbc:mysql://localhost:3306/default_db", "root", "password");
|
||||
dynamicRoutingDataSource.addDataSource("default", defaultDataSource);
|
||||
|
||||
return dynamicRoutingDataSource;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DataSourceTransactionManager transactionManager(DynamicRoutingDataSource dynamicRoutingDataSource) {
|
||||
return new DataSourceTransactionManager(dynamicRoutingDataSource);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SqlSessionFactory sqlSessionFactory(DynamicRoutingDataSource dynamicRoutingDataSource) throws Exception {
|
||||
SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
|
||||
sessionFactory.setDataSource(dynamicRoutingDataSource);
|
||||
return sessionFactory.getObject();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public SqlSessionTemplate dynamicSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
|
||||
return new SqlSessionTemplate(sqlSessionFactory);
|
||||
}
|
||||
|
||||
private DataSource createDataSource(String url, String username, String password) {
|
||||
// DBCP2 Connection Pool 사용
|
||||
PooledDataSource dataSource = new PooledDataSource();
|
||||
dataSource.setDriver("com.mysql.cj.jdbc.Driver");
|
||||
dataSource.setUrl(url);
|
||||
dataSource.setUsername(username);
|
||||
dataSource.setPassword(password);
|
||||
return dataSource;
|
||||
}
|
||||
}
|
||||
```
|
||||
✅ **기능:**
|
||||
- `createDataSource()`를 사용하여 **각각의 데이터소스를 DBCP2 기반 Connection Pool로 생성**.
|
||||
- `dynamicRoutingDataSource.addDataSource("default", defaultDataSource);`를 호출하여 기본 데이터소스를 등록.
|
||||
|
||||
---
|
||||
|
||||
## **🔹 3. 새로운 데이터소스를 동적으로 추가하는 서비스**
|
||||
📄 **`DynamicDataSourceService.java`**
|
||||
```java
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import javax.sql.DataSource;
|
||||
|
||||
@Service
|
||||
public class DynamicDataSourceService {
|
||||
|
||||
@Autowired
|
||||
private DynamicRoutingDataSource dynamicRoutingDataSource;
|
||||
|
||||
public void addNewDataSource(String dataSourceName, String url, String username, String password) {
|
||||
DataSource newDataSource = createDataSource(url, username, password);
|
||||
dynamicRoutingDataSource.addDataSource(dataSourceName, newDataSource);
|
||||
System.out.println("✅ 새로운 데이터소스 추가됨: " + dataSourceName);
|
||||
}
|
||||
|
||||
private DataSource createDataSource(String url, String username, String password) {
|
||||
PooledDataSource dataSource = new PooledDataSource();
|
||||
dataSource.setDriver("com.mysql.cj.jdbc.Driver");
|
||||
dataSource.setUrl(url);
|
||||
dataSource.setUsername(username);
|
||||
dataSource.setPassword(password);
|
||||
return dataSource;
|
||||
}
|
||||
}
|
||||
```
|
||||
✅ **기능:**
|
||||
- `addNewDataSource("newDB", "jdbc:mysql://localhost:3306/new_db", "user", "pass")`을 호출하면 **새로운 데이터소스를 동적으로 추가**.
|
||||
- `dynamicRoutingDataSource.addDataSource(dataSourceName, newDataSource)`을 사용하여 **데이터소스를 관리하는 Connection Pool에 추가**.
|
||||
|
||||
---
|
||||
|
||||
## **🔹 4. 특정 데이터소스를 선택하여 트랜잭션 처리**
|
||||
📄 **`UserService.java`**
|
||||
```java
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@Service
|
||||
public class UserService {
|
||||
|
||||
@Autowired
|
||||
private UserMapper userMapper;
|
||||
|
||||
public void fetchUsersFromDefault() {
|
||||
DynamicRoutingDataSource.setDataSource("default");
|
||||
System.out.println("🔹 Default DB에서 데이터 조회");
|
||||
System.out.println(userMapper.findAll());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void fetchUsersFromNewDB() {
|
||||
DynamicRoutingDataSource.setDataSource("newDB");
|
||||
System.out.println("🔹 New DB에서 데이터 조회");
|
||||
System.out.println(userMapper.findAll());
|
||||
}
|
||||
}
|
||||
```
|
||||
📄 **`UserMapper.java`**
|
||||
```java
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Mapper
|
||||
public interface UserMapper {
|
||||
@Select("SELECT * FROM users")
|
||||
List<Map<String, Object>> findAll();
|
||||
}
|
||||
```
|
||||
✅ **기능:**
|
||||
- `DynamicRoutingDataSource.setDataSource("newDB")`를 호출하면 **특정 데이터소스를 선택**.
|
||||
- MyBatis에서 **선택된 데이터소스를 기반으로 SQL 실행**.
|
||||
- `@Transactional`을 사용하여 **선택된 데이터소스에서 트랜잭션 처리** 가능.
|
||||
|
||||
---
|
||||
|
||||
## ✅ **최종 정리**
|
||||
| 기능 | 코드 |
|
||||
|------|------|
|
||||
| **동적으로 데이터소스 변경** | `DynamicRoutingDataSource.setDataSource("dataSourceName")` |
|
||||
| **새로운 데이터소스 추가** | `dynamicDataSourceService.addNewDataSource("newDB", "url", "user", "pass")` |
|
||||
| **Connection Pool 관리** | `PooledDataSource` (MyBatis DBCP2 사용) |
|
||||
| **트랜잭션 관리** | `@Transactional` 사용 |
|
||||
| **MyBatis 연동** | `SqlSessionFactory` + `SqlSessionTemplate` 사용 |
|
||||
|
||||
✅ **이제 애플리케이션 실행 중에도 N개의 데이터소스를 추가하고, Connection Pool로 관리하며, MyBatis와 연동하여 사용할 수 있습니다!** 🚀
|
||||
@ -1,12 +0,0 @@
|
||||
datasource:
|
||||
dfcms:
|
||||
driverClassName: org.postgresql.Driver
|
||||
url: jdbc:postgresql://bsm-lab.com:5432/defree?currentSchema=DFCMS
|
||||
username: defreeadmin
|
||||
password: qortpals1!
|
||||
mochastory:
|
||||
driverClassName: com.mysql.jdbc.Driver
|
||||
url: jdbc:mysql://bsm-lab.com:3306/MOCHASTORY?allowPublicKeyRetrieval=true
|
||||
username: MOCHASTORY
|
||||
password: MOCHASTORY
|
||||
|
||||
@ -1,115 +0,0 @@
|
||||
package com.bsmlab.dfx.agent.task;
|
||||
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigDto;
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigReader;
|
||||
import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService;
|
||||
import com.bsmlab.dfx.agent.listener.dto.AckDto;
|
||||
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
|
||||
import com.bsmlab.dfx.agent.support.exception.DfxException;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PostmanSchedulerService {
|
||||
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
|
||||
private final AgentConfigReader agentConfigReader;
|
||||
private final SqlExecuteService sqlExecuteService;
|
||||
private Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>();
|
||||
private Map<String, AgentConfigDto.Postman> postmanMap = new HashMap<>();
|
||||
|
||||
public void startPostman(String postmanId) {
|
||||
AgentConfigDto.Postman postman = this.postmanMap.get(postmanId);
|
||||
this.stop(postmanId);
|
||||
String cron = postman.getAction().getCron();
|
||||
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(() -> run(postman), new CronTrigger(cron));
|
||||
scheduledFutureMap.put(postmanId, scheduledFuture);
|
||||
}
|
||||
|
||||
private void stop(String postmanId) {
|
||||
if(this.scheduledFutureMap.get(postmanId) != null) {
|
||||
ScheduledFuture<?> scheduledFuture = this.scheduledFutureMap.get(postmanId);
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduledFutureMap.remove(postmanId);
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void launch() {
|
||||
List<AgentConfigDto.Postman> postmanList = agentConfigReader.getScheduledTypePostmanList();
|
||||
for(AgentConfigDto.Postman postman : postmanList) {
|
||||
this.startPostman(postman.getPostmanId());
|
||||
}
|
||||
/*
|
||||
{
|
||||
"postman-id": "postman2",
|
||||
"task-type": "DB_READ_THEN_SEND",
|
||||
"action": {
|
||||
"type": "SCHEDULED",
|
||||
"cron": "1 0 3 * * *"
|
||||
},
|
||||
"message": {
|
||||
"message-type": "SAVE_DB_DATA",
|
||||
"dataSourceId": "dfcms",
|
||||
"sql-id": "dfcms.selectSome"
|
||||
},
|
||||
"recipient-host-id": "third-agent",
|
||||
"recipient-drop-box-id": "drop3",
|
||||
"routing-host-id-list": ["first-agent", "second-agent", "third-agent"]
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
public void run(AgentConfigDto.Postman postman) {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String senderHostId = agentConfigReader.getAgentConfigDto().getHostId();
|
||||
long senderTimestamp = System.currentTimeMillis();
|
||||
String messageUuid = UUID.randomUUID().toString();
|
||||
// DB TO DB 전송
|
||||
if(AgentConfigDto.MessageType.SAVE_DB_DATA == postman.getMessage().getMessageType()) {
|
||||
String dataSourceId = postman.getMessage().getDataSourceId();
|
||||
String sqlId = postman.getMessage().getSqlId();
|
||||
try {
|
||||
List<Map<String, Object>> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null);
|
||||
String dataString = objectMapper.writeValueAsString(dataMapList);
|
||||
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
|
||||
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.SAVE_DB_DATA)
|
||||
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
|
||||
.data(dataString).build();
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
|
||||
HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders);
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId());
|
||||
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
|
||||
String response = restTemplate.postForObject(url, bodyEntity, String.class);
|
||||
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
|
||||
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
|
||||
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (DfxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,42 +0,0 @@
|
||||
package com.bsmlab.dfx.agent.task;
|
||||
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigDto;
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigReader;
|
||||
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
|
||||
import io.micrometer.common.util.StringUtils;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class TaskExecutorStarter {
|
||||
private final AgentConfigReader agentConfigReader;
|
||||
private final DropBoxService dropBoxService;
|
||||
private final TaskExecutorService taskExecutorService;
|
||||
|
||||
@PostConstruct
|
||||
public void run() {
|
||||
List<AgentConfigDto.Postman> scheduledPostmanList = agentConfigReader.getScheduledTypePostmanList();
|
||||
for(AgentConfigDto.Postman postman : scheduledPostmanList) {
|
||||
taskExecutorService.processPostman(postman);
|
||||
}
|
||||
|
||||
while(true) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
String messageFilePath = dropBoxService.poll();
|
||||
if(StringUtils.isNotBlank(messageFilePath)) {
|
||||
taskExecutorService.processDropBox(messageFilePath);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.error("{}", e, e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
package com.bsmlab.dfx.agent.task.dropbox;
|
||||
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigDto;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class DropBoxDto {
|
||||
private String dropBoxId;
|
||||
private AgentConfigDto.TaskType taskType;
|
||||
private String dataSourceId;
|
||||
private String sqlId;
|
||||
private String saveDirectoryRoot;
|
||||
}
|
||||
@ -1,36 +0,0 @@
|
||||
package com.bsmlab.dfx.agent.task.postman;
|
||||
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigDto;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class PostmanDto {
|
||||
private String postmanId;
|
||||
private AgentConfigDto.TaskType taskType;
|
||||
private PostmanActionType postmanActionType;
|
||||
private PostmanMessageType postmanMessageType;
|
||||
private String recipientHostId;
|
||||
private String recipientDropBoxId;
|
||||
private List<String> routingHostIdList;
|
||||
|
||||
@Data
|
||||
public static class PostmanActionType {
|
||||
private AgentConfigDto.ActionType actionType;
|
||||
private String command;
|
||||
private List<String> parameterKeyList = new ArrayList<>();
|
||||
private String cron;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class PostmanMessageType {
|
||||
private AgentConfigDto.MessageType messageType;
|
||||
private String dataSourceId;
|
||||
private String sqlId;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,200 @@
|
||||
package com.bsmlab.dfx.agent.task.postman;
|
||||
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigDto;
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigReader;
|
||||
import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService;
|
||||
import com.bsmlab.dfx.agent.listener.dto.AckDto;
|
||||
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
|
||||
import com.bsmlab.dfx.agent.support.exception.DfxException;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.core.io.FileSystemResource;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.CopyOption;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PostmanSchedulerService {
|
||||
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
|
||||
private final AgentConfigReader agentConfigReader;
|
||||
private final SqlExecuteService sqlExecuteService;
|
||||
private Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>();
|
||||
private Map<String, AgentConfigDto.Postman> postmanMap = new HashMap<>();
|
||||
|
||||
private void startPostman(String postmanId) {
|
||||
AgentConfigDto.Postman postman = this.postmanMap.get(postmanId);
|
||||
this.stop(postmanId);
|
||||
String cron = postman.getAction().getCron();
|
||||
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(() -> run(postman), new CronTrigger(cron));
|
||||
scheduledFutureMap.put(postmanId, scheduledFuture);
|
||||
}
|
||||
|
||||
private void stop(String postmanId) {
|
||||
if(this.scheduledFutureMap.get(postmanId) != null) {
|
||||
ScheduledFuture<?> scheduledFuture = this.scheduledFutureMap.get(postmanId);
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduledFutureMap.remove(postmanId);
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void launch() {
|
||||
List<AgentConfigDto.Postman> postmanList = agentConfigReader.getScheduledTypePostmanList();
|
||||
for(AgentConfigDto.Postman postman : postmanList) {
|
||||
this.startPostman(postman.getPostmanId());
|
||||
}
|
||||
}
|
||||
|
||||
public void run(AgentConfigDto.Postman postman) {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String senderHostId = agentConfigReader.getAgentConfigDto().getHostId();
|
||||
long senderTimestamp = System.currentTimeMillis();
|
||||
String messageUuid = UUID.randomUUID().toString();
|
||||
// DB TO DB 전송
|
||||
if(AgentConfigDto.TaskType.DB_READ_THEN_SEND == postman.getTaskType()) {
|
||||
String dataSourceId = postman.getMessage().getDataSourceId();
|
||||
String sqlId = postman.getMessage().getSqlId();
|
||||
try {
|
||||
List<Map<String, Object>> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null);
|
||||
String dataString = objectMapper.writeValueAsString(dataMapList);
|
||||
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
|
||||
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB)
|
||||
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
|
||||
.data(dataString).build();
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
|
||||
HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders);
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId());
|
||||
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
|
||||
String response = restTemplate.postForObject(url, bodyEntity, String.class);
|
||||
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
|
||||
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
|
||||
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (DfxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
else if(AgentConfigDto.TaskType.FILE_READ_THEN_SEND == postman.getTaskType()) {
|
||||
File rootDirectoryfile = new File(postman.getMessage().getWatchDirectory());
|
||||
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/hh/mm/ss");
|
||||
Date today = new Date(System.currentTimeMillis());
|
||||
String dateString = simpleDateFormat.format(today);
|
||||
List<File> fileReadyList = new ArrayList<>();
|
||||
List<File> workingFileList = new ArrayList<>();
|
||||
if(rootDirectoryfile.isDirectory() && rootDirectoryfile.listFiles().length > 0) {
|
||||
// 대상 파일 찾기 -> 작업 디렉토리(working)로 이동 -> 작업 완료 후 완료 디렉토리(done)로 이동
|
||||
File[] files = rootDirectoryfile.listFiles();
|
||||
for(File file : files) {
|
||||
if(file.isFile()) {
|
||||
fileReadyList.add(file);
|
||||
}
|
||||
}
|
||||
List<Map<String, Object>> data = new ArrayList<>();
|
||||
String dataString = null;
|
||||
String workingDirectory = rootDirectoryfile.getAbsolutePath() + "/working/" + dateString;
|
||||
File workingDirectoryFile = new File(workingDirectory);
|
||||
if(!workingDirectoryFile.exists()) {
|
||||
workingDirectoryFile.mkdirs();
|
||||
}
|
||||
try {
|
||||
// 작업 디렉토리(working)로 이동
|
||||
for(File file : fileReadyList) {
|
||||
Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
|
||||
workingFileList.add(new File(workingDirectoryFile.getAbsoluteFile() + "/" + file.getName()));
|
||||
}
|
||||
// ReceiveMessageDto - messageJson 만들기
|
||||
for(File file : workingFileList) {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
Map<String, Object> paramMap = new HashMap<>();
|
||||
map.put("file-name", file.getName());
|
||||
if(postman.getMessage() != null && StringUtils.isNotEmpty(postman.getMessage().getMetaDropBoxId())
|
||||
&& StringUtils.isNotEmpty(postman.getMessage().getMetaDataSqlId()) && StringUtils.isNotEmpty(postman.getMessage().getMetaDataDataSourceId())) {
|
||||
map.put("meta-drop-box-id", postman.getMessage().getMetaDropBoxId());
|
||||
paramMap.put("fileName", file.getName());
|
||||
List<Map<String, Object>> messageDataMapList = sqlExecuteService.select(postman.getMessage().getDataSourceId(), postman.getMessage().getMetaDataSqlId(), paramMap);
|
||||
if(messageDataMapList != null && messageDataMapList.get(0) != null) {
|
||||
map.put("meta-data", messageDataMapList.get(0));
|
||||
}
|
||||
}
|
||||
data.add(map);
|
||||
}
|
||||
dataString = objectMapper.writeValueAsString(data);
|
||||
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
|
||||
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB)
|
||||
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
|
||||
.data(dataString).build();
|
||||
String messageString = objectMapper.writeValueAsString(receiveMessageDto);
|
||||
// http 준비
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA);
|
||||
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
|
||||
// 첫 번째 멀티파트는 message json
|
||||
body.add("json", new HttpEntity<>(messageString, httpHeaders));
|
||||
// 두 번째 이후 멀티파트는 파일
|
||||
for(File file : workingFileList) {
|
||||
body.add(file.getName(), new FileSystemResource(file));
|
||||
}
|
||||
// 전송
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId());
|
||||
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
|
||||
String response = restTemplate.postForObject(url, body, String.class);
|
||||
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
|
||||
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
|
||||
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response);
|
||||
}
|
||||
else {
|
||||
// 작업 완료(done) 디렉토리로 이동
|
||||
String doneDirectory = rootDirectoryfile + "/done/" + dateString;
|
||||
File doneDirectoryFile = new File(doneDirectory);
|
||||
if(!doneDirectoryFile.exists()) {
|
||||
doneDirectoryFile.mkdirs();
|
||||
}
|
||||
for(File file : workingFileList) {
|
||||
Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (DfxException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Path startPath = Paths.get("C:/example"); // 시작 디렉토리 경로
|
||||
//
|
||||
// try (Stream<Path> stream = Files.walk(startPath)) {
|
||||
// stream.filter(Files::isRegularFile)
|
||||
// .forEach(System.out::println);
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in new issue