job 중복실행 방지

This commit is contained in:
cha 2025-07-17 11:25:01 +09:00
parent d11defb797
commit 1dc0edb5e5

View File

@ -1,6 +1,5 @@
package com.interplug.qcast.batch; package com.interplug.qcast.batch;
import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -9,8 +8,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*; import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
@ -21,10 +21,10 @@ import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
public class JobLauncherController { public class JobLauncherController {
private final Map<String, Job> jobs; private final Map<String, Job> jobs; // 여러 Job을 주입받도록 변경
private final JobLauncher jobLauncher; private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer; private final JobExplorer jobExplorer;
private final JobOperator jobOperator;
@Value("${qsp.master-admin-user-batch-url}") @Value("${qsp.master-admin-user-batch-url}")
private String qspInterfaceUrl; private String qspInterfaceUrl;
@ -34,205 +34,207 @@ public class JobLauncherController {
// 현재 실행 중인 Job 추적을 위한 Set // 현재 실행 중인 Job 추적을 위한 Set
private final Set<String> runningJobs = ConcurrentHashMap.newKeySet(); private final Set<String> runningJobs = ConcurrentHashMap.newKeySet();
/** /**
* 특정 Job을 매핑으로 실행하는 메소드 * 특정 Job을 매핑으로 실행하는 메소드
*
* @param jobName
* @return
* @throws JobInstanceAlreadyCompleteException
* @throws JobExecutionAlreadyRunningException
* @throws JobParametersInvalidException
* @throws JobRestartException
*/ */
@GetMapping("/batch/job/{jobName}") @GetMapping("/batch/job/{jobName}") // Path Variable로 jobName을 받음
public Map<String, Object> launchJob(@PathVariable String jobName) { public Map<String, Object> launchJob(@PathVariable String jobName)
Job job = jobs.get(jobName); throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
Map<String, Object> resultMap = new HashMap<>(); JobParametersInvalidException, JobRestartException {
Job job = jobs.get(jobName);
Map<String, Object> resultMap = new HashMap<String, Object>();
if (job == null) { if (job == null) {
// return "Job " + jobName + " not found";
resultMap.put("code", "FAILED"); resultMap.put("code", "FAILED");
resultMap.put("message", "Job " + jobName + " not found"); resultMap.put("message", "Job" + jobName + " not found");
return resultMap; return resultMap;
} }
// 실행 중인 Job 확인 // 실행 중인 Job 확인
if (runningJobs.contains(jobName) || isJobRunning(jobName)) { if (runningJobs.contains(jobName) || isJobRunning(jobName)) {
resultMap.put("code", "ALREADY_RUNNING"); log.warn("Job {} is already running, skipping execution", jobName);
resultMap.put("message", "Job " + jobName + " is already running"); resultMap.put("code", "FAILED");
resultMap.put("message", "Job "+ jobName +" is already running, skipping execution");
return resultMap; return resultMap;
} }
try {
runningJobs.add(jobName);
JobParameters jobParameters = new JobParametersBuilder() JobParameters jobParameters = new JobParametersBuilder().addString("jobName", jobName)
.addString("jobName", jobName) .addDate("time", new Date()).toJobParameters();
.addDate("time", new Date())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters); JobExecution jobExecution = jobLauncher.run(job, jobParameters);
BatchStatus status = jobExecution.getStatus(); BatchStatus status = jobExecution.getStatus();
ExitStatus exitStatus = jobExecution.getExitStatus(); ExitStatus exitStatus = jobExecution.getExitStatus();
resultMap.put("code", status.toString()); resultMap.put("code", status.toString());
resultMap.put("message", exitStatus.getExitDescription()); resultMap.put("message", exitStatus.getExitDescription());
} catch (JobExecutionAlreadyRunningException e) {
resultMap.put("code", "ALREADY_RUNNING");
resultMap.put("message", "Job " + jobName + " is already running");
} catch (Exception e) {
resultMap.put("code", "FAILED");
resultMap.put("message", "Error: " + e.getMessage());
} finally {
runningJobs.remove(jobName);
}
// return "Job " + jobName + " started";
return resultMap; return resultMap;
} }
/** /**
* Q.CAST 판매점 / 사용자 / 즐겨찾기 / 노출 아이템 동기화 배치 * Q.CAST 판매점 / 사용자 / 즐겨찾기 / 노출 아이템 동기화 배치
*
*/ */
@Scheduled(cron = "0 30 23 * * *") // @Scheduled(cron = "*/5 * * * * *")
public String storeAdditionalJob() { @Scheduled(cron = "0 50 23 * * *")
public String storeAdditionalJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("storeAdditionalJob"); return executeScheduledJob("storeAdditionalJob");
} }
/** /**
* 아이템 동기화 배치 * 아이템 동기화 배치
*
*/ */
@Scheduled(cron = "0 30 02 * * *") @Scheduled(cron = "0 30 02 * * *")
public String materialJob() { public String materialJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("materialJob"); return executeScheduledJob("materialJob");
} }
/** /**
* BOM 아이템 동기화 배치 * BOM 아이템 동기화 배치
*
*/ */
@Scheduled(cron = "0 40 02 * * *") @Scheduled(cron = "0 40 02 * * *")
public String bomJob() { public String bomJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("bomJob"); return executeScheduledJob("bomJob");
} }
/** /**
* 영업사원 동기화 배치 * 영업사원 동기화 배치
*
*/ */
@Scheduled(cron = "0 40 03 * * *") @Scheduled(cron = "0 40 03 * * *")
public String businessChargerJob() { public String businessChargerJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("businessChargerJob"); return executeScheduledJob("businessChargerJob");
} }
/** /**
* 관리자 유저 동기화 배치 * 관리자 유저 동기화 배치
*
*/ */
@Scheduled(cron = "0 0 01 * * *") @Scheduled(cron = "0 30 01 * * *")
public String adminUserJob() { public String adminUserJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("adminUserJob"); return executeScheduledJob("adminUserJob");
} }
/** /**
* 가격 동기화 배치 * 가격 동기화 배치
*
*/ */
@Scheduled(cron = "0 30 0 * * *") // 시간 조정: 00:30 @Scheduled(cron = "0 20 00 * * *")
public String priceJob() { public String priceJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("priceJob"); return executeScheduledJob("priceJob");
} }
/** /**
* 공통코드 M_COMM_H, M_COMM_L 동기화 배치 * 공통코드 M_COMM_H, M_COMM_L 동기화 배치
*
*/ */
@Scheduled(cron = "0 10 03 * * *") @Scheduled(cron = "0 10 03 * * *")
public String commonCodeJob() { public String commonCodeJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("commonCodeJob"); return executeScheduledJob("commonCodeJob");
} }
/** /**
* Q.CAST 견적특이사항 / 아이템 표시, 미표시 동기화 배치 * Q.CAST 견적특이사항 / 아이템 표시, 미표시 동기화 배치
*
*/ */
@Scheduled(cron = "0 45 23 * * *") // 시간 조정: 23:45 @Scheduled(cron = "0 30 23 * * *")
public String specialNoteDispItemAdditionalInfoJob() { public String specialNoteDispItemAdditionalInfoJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("specialNoteDispItemAdditionalJob"); return executeScheduledJob("specialNoteDispItemAdditionalJob");
} }
/** /**
* Plan Confrim 동기화 배치 * Plan Confrim 동기화 배치
*
*/ */
@Scheduled(cron = "0 05 04 * * *") @Scheduled(cron = "0 05 04 * * *")
public String planConfirmJob() { public String planConfirmJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("planConfirmJob"); return executeScheduledJob("planConfirmJob");
} }
/** /**
* 견적서 전송 동기화 배치 * 견적서 전송 동기화 배치
*
*/ */
@Scheduled(cron = "0 20 04 * * *") @Scheduled(cron = "0 20 04 * * *")
public String estimateSyncJob() { public String estimateSyncJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("estimateSyncJob"); return executeScheduledJob("estimateSyncJob");
} }
/** /**
* 공통 스케줄러 실행 메소드 * 공통 스케줄러 실행 메소드
*/ */
private String executeScheduledJob(String jobName) { private String executeScheduledJob(String jobName) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
// 1. 스케줄러 설정 확인 (materialJob, commonCodeJob 예외)
if (!"Y".equals(scheduler) && !"materialJob".equals(jobName) && !"commonCodeJob".equals(jobName)) {
log.info("Scheduler disabled, skipping job {}", jobName);
return "Scheduler disabled";
}
// 2. Job 존재 확인
Job job = jobs.get(jobName); Job job = jobs.get(jobName);
if (job == null) { if (job == null) {
log.error("Job {} not found", jobName); log.error("Job {} not found", jobName);
return "Job " + jobName + " not found"; return "Job " + jobName + " not found";
} }
// 3. 같은 Job이 실행 중인지 확인 if (!"Y".equals(scheduler)
if (isJobActuallyRunning(jobName)) { && !"materialJob".equals(jobName)
log.warn("Job {} is already running", jobName); && !"commonCodeJob".equals(jobName)) {
log.info("Scheduler disabled, skipping job {}", jobName);
return "Scheduler disabled";
}
// 실행 중인 Job 확인
if (runningJobs.contains(jobName) || isJobRunning(jobName)) {
log.warn("Job {} is already running, skipping execution", jobName);
return "Job already running"; return "Job already running";
} }
// 4. 다른 Job이 실행 중인지 확인 JobParameters jobParameters =
String runningJobName = findCurrentlyRunningJob(); new JobParametersBuilder().addDate("time", new Date()).toJobParameters();
if (runningJobName != null) {
log.warn("Another job '{}' is running, skipping job {}", runningJobName, jobName);
return "Another job is running: " + runningJobName;
}
// 5. Job 실행 jobLauncher.run(job, jobParameters);
try {
runningJobs.add(jobName);
log.info("Starting job {}", jobName);
JobParameters jobParameters = new JobParametersBuilder() return jobName+ " executed successfully";
.addDate("time", new Date())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.info("Job {} started successfully with execution ID: {}", jobName, jobExecution.getId());
return "OK";
} catch (JobExecutionAlreadyRunningException e) {
log.warn("Job {} is already running: {}", jobName, e.getMessage());
return "Job already running";
} catch (Exception e) {
log.error("Error executing job {}: {}", jobName, e.getMessage());
return "Error: " + e.getMessage();
} finally {
runningJobs.remove(jobName);
}
}
/**
* 현재 실행 중인 Job 찾기
*/
private String findCurrentlyRunningJob() {
String[] jobNames = {"storeAdditionalJob", "materialJob", "bomJob", "businessChargerJob",
"adminUserJob", "priceJob", "commonCodeJob", "specialNoteDispItemAdditionalJob",
"planConfirmJob", "estimateSyncJob"};
for (String jobName : jobNames) {
if (isJobActuallyRunning(jobName)) {
return jobName;
}
}
return null;
} }
/** /**
@ -256,216 +258,4 @@ public class JobLauncherController {
return false; return false;
} }
} }
/**
* Job이 실제로 실행 중인지 확인
*/
private boolean isJobActuallyRunning(String jobName) {
try {
// 1. 메모리 Set에서 확인
if (runningJobs.contains(jobName)) {
log.debug("Job {} found in runningJobs set", jobName);
// 2. JobOperator로 실제 실행 상태 확인
try {
Set<Long> runningExecutions = jobOperator.getRunningExecutions(jobName);
if (!runningExecutions.isEmpty()) {
log.debug("Job {} has {} running executions", jobName, runningExecutions.size());
return true;
} else {
log.warn("Job {} in runningJobs set but no actual running executions found", jobName);
runningJobs.remove(jobName); // 메모리 Set 정리
return false;
}
} catch (Exception e) {
log.warn("Error checking JobOperator for {}: {}", jobName, e.getMessage());
return false;
}
}
// 3. JobOperator로 직접 확인
try {
Set<Long> runningExecutions = jobOperator.getRunningExecutions(jobName);
if (!runningExecutions.isEmpty()) {
log.info("Job {} has running executions but not in runningJobs set", jobName);
runningJobs.add(jobName); // 메모리 Set 동기화
return true;
}
} catch (Exception e) {
log.warn("Error checking JobOperator for {}: {}", jobName, e.getMessage());
}
// 4. JobExplorer로 DB 상태 확인 (보조적)
return isJobRunningInDatabase(jobName);
} catch (Exception e) {
log.error("Error checking job running status for {}: {}", jobName, e.getMessage());
return false;
}
}
/**
* DB에서 Job 실행 상태 확인 (시간 기반 필터링)
*/
private boolean isJobRunningInDatabase(String jobName) {
try {
List<JobInstance> jobInstances = jobExplorer.findJobInstancesByJobName(jobName, 0, 1);
if (jobInstances.isEmpty()) {
return false;
}
JobInstance latestJobInstance = jobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(latestJobInstance);
if (jobExecutions.isEmpty()) {
return false;
}
JobExecution latestExecution = jobExecutions.get(0);
BatchStatus status = latestExecution.getStatus();
if (status == BatchStatus.STARTED || status == BatchStatus.STARTING) {
if (latestExecution.getStartTime() != null) {
long minutesAgo = java.time.Duration.between(
latestExecution.getStartTime(),
java.time.LocalDateTime.now()
).toMinutes();
// 30분 이상 실행은 중단된 것으로 간주
if (minutesAgo > 30) {
log.warn("Job {} has stale execution in DB (started {} minutes ago)", jobName, minutesAgo);
return false;
}
log.debug("Job {} appears to be running in DB (started {} minutes ago)", jobName, minutesAgo);
return true;
}
}
return false;
} catch (Exception e) {
log.error("Error checking job running status in DB for {}: {}", jobName, e.getMessage());
return false;
}
}
/**
* 현재 실행 중인 Job 목록 조회
*/
@GetMapping("/batch/running-jobs")
public Map<String, Object> getRunningJobs() {
Map<String, Object> result = new HashMap<>();
result.put("runningJobs", new ArrayList<>(runningJobs));
Map<String, Object> jobStatuses = new HashMap<>();
String[] jobNames = {"storeAdditionalJob", "materialJob", "bomJob", "businessChargerJob",
"adminUserJob", "priceJob", "commonCodeJob", "specialNoteDispItemAdditionalJob",
"planConfirmJob", "estimateSyncJob"};
for (String jobName : jobNames) {
jobStatuses.put(jobName, getJobStatus(jobName));
}
result.put("jobStatuses", jobStatuses);
return result;
}
/**
* 특정 Job의 상태 정보 조회
*/
private Map<String, Object> getJobStatus(String jobName) {
Map<String, Object> jobStatus = new HashMap<>();
try {
List<JobInstance> jobInstances = jobExplorer.findJobInstancesByJobName(jobName, 0, 1);
if (jobInstances.isEmpty()) {
jobStatus.put("status", "NEVER_EXECUTED");
return jobStatus;
}
JobInstance latestJobInstance = jobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(latestJobInstance);
if (!jobExecutions.isEmpty()) {
JobExecution latestExecution = jobExecutions.get(0);
jobStatus.put("status", latestExecution.getStatus().toString());
jobStatus.put("startTime", latestExecution.getStartTime());
jobStatus.put("endTime", latestExecution.getEndTime());
// Duration 계산 (LocalDateTime용)
if (latestExecution.getEndTime() != null && latestExecution.getStartTime() != null) {
Duration duration = Duration.between(latestExecution.getStartTime(), latestExecution.getEndTime());
jobStatus.put("durationSeconds", duration.getSeconds());
jobStatus.put("durationMillis", duration.toMillis());
}
}
} catch (Exception e) {
jobStatus.put("status", "ERROR");
jobStatus.put("error", e.getMessage());
}
return jobStatus;
}
/**
* 실행 중인 Job 상태 상세 확인
*/
@GetMapping("/batch/debug/running-jobs-detail")
public Map<String, Object> getRunningJobsDetail() {
Map<String, Object> result = new HashMap<>();
String[] jobNames = {"storeAdditionalJob", "materialJob", "bomJob", "businessChargerJob",
"adminUserJob", "priceJob", "commonCodeJob", "specialNoteDispItemAdditionalJob",
"planConfirmJob", "estimateSyncJob"};
Map<String, Object> jobDetails = new HashMap<>();
List<String> actuallyRunningJobs = new ArrayList<>();
for (String jobName : jobNames) {
Map<String, Object> jobDetail = new HashMap<>();
// 메모리 Set 확인
boolean inMemorySet = runningJobs.contains(jobName);
jobDetail.put("inMemorySet", inMemorySet);
// JobOperator 확인
try {
Set<Long> runningExecutions = jobOperator.getRunningExecutions(jobName);
boolean hasRunningExecutions = !runningExecutions.isEmpty();
jobDetail.put("runningExecutions", runningExecutions);
jobDetail.put("hasRunningExecutions", hasRunningExecutions);
} catch (Exception e) {
jobDetail.put("operatorError", e.getMessage());
jobDetail.put("hasRunningExecutions", false);
}
// DB 상태 확인
boolean runningInDB = isJobRunningInDatabase(jobName);
jobDetail.put("runningInDB", runningInDB);
// 실제 실행 여부
boolean actuallyRunning = isJobActuallyRunning(jobName);
jobDetail.put("actuallyRunning", actuallyRunning);
if (actuallyRunning) {
actuallyRunningJobs.add(jobName);
}
jobDetails.put(jobName, jobDetail);
}
result.put("jobDetails", jobDetails);
result.put("actuallyRunningJobs", actuallyRunningJobs);
result.put("isAnyJobRunning", !actuallyRunningJobs.isEmpty());
result.put("schedulerEnabled", scheduler);
return result;
}
} }