qcast-api/src/main/java/com/interplug/qcast/batch/JobLauncherController.java

384 lines
13 KiB
Java

package com.interplug.qcast.batch;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.context.event.EventListener;
import java.time.LocalDateTime;
@RestController
@RequiredArgsConstructor
@Slf4j
public class JobLauncherController {
private final Map<String, Job> jobs; // 여러 Job을 주입받도록 변경
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository; // 생성자 주입을 위해 필드 추가
@Value("${qsp.master-admin-user-batch-url}")
private String qspInterfaceUrl;
@Value("${spring.profiles.scheduler}")
private String scheduler;
/**
* 서버 시작 시 실행 중(STARTED)인 상태로 남은 Job들을 FAILED 처리
*/
@EventListener(ApplicationReadyEvent.class)
public void resetFailedJobs() {
log.info("Checking for 'STARTED' jobs to reset after reboot...");
for (String jobName : jobs.keySet()) {
Set<JobExecution> runningExecutions = jobExplorer.findRunningJobExecutions(jobName);
for (JobExecution execution : runningExecutions) {
execution.setStatus(BatchStatus.FAILED);
execution.setExitStatus(ExitStatus.FAILED.addExitDescription("Reset on application startup"));
execution.setEndTime(LocalDateTime.now()); // new Date() 대신 LocalDateTime.now() 사용
jobRepository.update(execution);
log.info("Reset job execution {} for job {}", execution.getId(), jobName);
}
}
}
/**
* 특정 Job을 매핑으로 실행하는 메소드
*
* @param jobName
* @return
* @throws JobInstanceAlreadyCompleteException
* @throws JobExecutionAlreadyRunningException
* @throws JobParametersInvalidException
* @throws JobRestartException
*/
@GetMapping("/batch/job/{jobName}") // Path Variable로 jobName을 받음
public Map<String, Object> launchJob(@PathVariable String jobName)
throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
JobParametersInvalidException, JobRestartException {
Job job = jobs.get(jobName);
Map<String, Object> resultMap = new HashMap<String, Object>();
if (job == null) {
// return "Job " + jobName + " not found";
resultMap.put("code", "FAILED");
resultMap.put("message", "Job" + jobName + " not found");
return resultMap;
}
// 실행 중인 Job 확인 (데이터베이스 기반)
if (isJobRunning(jobName)) {
log.warn("Job {} is already running, skipping execution", jobName);
resultMap.put("code", "FAILED");
resultMap.put("message", "Job "+ jobName +" is already running, skipping execution");
return resultMap;
}
try {
log.info("Starting job: {}", jobName);
JobParameters jobParameters = new JobParametersBuilder().addString("jobName", jobName)
.addDate("time", new Date()).toJobParameters();
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
BatchStatus status = jobExecution.getStatus();
ExitStatus exitStatus = jobExecution.getExitStatus();
resultMap.put("code", status.toString());
resultMap.put("message", exitStatus.getExitDescription());
// return "Job " + jobName + " started";
return resultMap;
} catch (Exception e) {
resultMap.put("code", "FAILED");
resultMap.put("message", e.getMessage());
return resultMap;
}
}
/**
* Q.CAST 판매점 / 사용자 / 즐겨찾기 / 노출 아이템 동기화 배치
*
*/
// @Scheduled(cron = "*/5 * * * * *")
@Scheduled(cron = "0 50 23 * * *")
public String storeAdditionalJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("storeAdditionalJob");
}
/**
* 아이템 동기화 배치
*
*/
@Scheduled(cron = "0 30 02 * * *")
public String materialJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("materialJob");
}
/**
* BOM 아이템 동기화 배치
*
*/
@Scheduled(cron = "0 40 02 * * *")
public String bomJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("bomJob");
}
/**
* 영업사원 동기화 배치
*
*/
@Scheduled(cron = "0 40 03 * * *")
public String businessChargerJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("businessChargerJob");
}
/**
* 관리자 유저 동기화 배치
*
*/
@Scheduled(cron = "0 30 01 * * *")
public String adminUserJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("adminUserJob");
}
/**
* 가격 동기화 배치
*
*/
@Scheduled(cron = "0 20 00 * * *")
public String priceJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("priceJob");
}
/**
* 공통코드 M_COMM_H, M_COMM_L 동기화 배치
*
*/
@Scheduled(cron = "0 10 03 * * *")
public String commonCodeJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("commonCodeJob");
}
/**
* Q.CAST 견적특이사항 / 아이템 표시, 미표시 동기화 배치
*
*/
@Scheduled(cron = "0 30 23 * * *")
public String specialNoteDispItemAdditionalInfoJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("specialNoteDispItemAdditionalJob");
}
/**
* Plan Confrim 동기화 배치
*
*/
@Scheduled(cron = "0 05 04 * * *")
public String planConfirmJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("planConfirmJob");
}
/**
* 견적서 전송 동기화 배치
*
*/
@Scheduled(cron = "0 20 04 * * *")
public String estimateSyncJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
return executeScheduledJob("estimateSyncJob");
}
/**
* 공통 스케줄러 실행 메소드
*/
private String executeScheduledJob(String jobName) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
Job job = jobs.get(jobName);
if (job == null) {
log.error("Job {} not found", jobName);
return "Job " + jobName + " not found";
}
// if (!"Y".equals(scheduler) &&
// !"materialJob".equals(jobName) &&
// !"commonCodeJob".equals(jobName) &&
// !"specialNoteDispItemAdditionalJob".equals(jobName) &&
// !"planConfirmJob".equals(jobName) &&
// !"estimateSyncJob".equals(jobName) &&
// !"bomJob".equals(jobName)){
// log.info("Scheduler disabled, skipping job {}", jobName);
// return "Scheduler disabled";
// }
// 허용된 작업 목록 정의
Set<String> allowedJobs = new HashSet<>(Arrays.asList(
"materialJob",
"commonCodeJob",
"specialNoteDispItemAdditionalJob",
"planConfirmJob",
"estimateSyncJob",
"bomJob",
"storeAdditionalJob",
"businessChargerJob",
"adminUserJob"
));
// 스케줄러가 비활성화되어 있고, 허용된 작업이 아닌 경우
if (!"Y".equals(scheduler) && !allowedJobs.contains(jobName)) {
log.info("스케줄러가 비활성화되어 작업을 건너뜁니다: {}", jobName);
return "Scheduler disabled";
}
// 실행 중인 Job 확인 (데이터베이스 기반)
if (isJobRunning(jobName)) {
log.warn("Job {} is already running, skipping execution", jobName);
return "Job already running";
}
// JobParameters jobParameters =
// new JobParametersBuilder().addDate("time", new Date()).toJobParameters();
//
// jobLauncher.run(job, jobParameters);
//
// return jobName+ " executed successfully";
// 재시도 로직 추가
int maxRetries = 3;
int retryCount = 0;
long initialDelay = 60000; // 1분
while (retryCount < maxRetries) {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobName", jobName)
.addDate("time", new Date())
.addLong("run.id", System.currentTimeMillis())
.toJobParameters();
log.info("Job {} 실행 시도 (재시도 {}/{})", jobName, retryCount + 1, maxRetries);
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
log.info("Job {} 완료 상태: {}", jobName, jobExecution.getExitStatus().getExitCode());
return jobName + " executed successfully";
} catch (Exception e) {
retryCount++;
// 데드락 오류인지 확인
boolean isDeadlock = e.getCause() != null &&
e.getCause().getMessage() != null &&
e.getCause().getMessage().toLowerCase().contains("deadlock");
if (isDeadlock && retryCount < maxRetries) {
long delay = initialDelay * retryCount; // 1분, 2분, 3분 증가
log.warn("Job {} 실행 중 데드락 발생. {}/{} 재시도 ({}ms 후 재시도)...",
jobName, retryCount, maxRetries, delay, e);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Job {} 재시도 중 인터럽트 발생", jobName, ie);
return "Job " + jobName + " interrupted during retry";
}
} else {
log.error("Job {} 실행 실패 (재시도 {}/{})", jobName, retryCount, maxRetries, e);
return "Error executing job " + jobName + ": " + e.getMessage();
}
}
}
return "Job " + jobName + " failed after " + maxRetries + " retries";
}
/**
* Job 실행 상태 확인
*/
private boolean isJobRunning(String jobName) {
try {
Set<JobExecution> runningExecutions = jobExplorer.findRunningJobExecutions(jobName);
if (runningExecutions.isEmpty()) {
return false;
}
boolean isActuallyBlocked = false;
for (JobExecution execution : runningExecutions) {
LocalDateTime startTime = execution.getStartTime();
if (startTime == null) continue;
// 현재 시간과 시작 시간의 차이 계산
Duration duration = Duration.between(startTime, LocalDateTime.now());
long hours = duration.toHours();
if (hours >= 1) {
// 1시간 이상 경과한 경우 로그 출력 및 DB 강제 업데이트
log.warn("Job {} (Execution ID: {}) 가 실행된 지 {}시간이 지났습니다. 상태를 FAILED로 초기화하고 재실행을 시도합니다.",
jobName, execution.getId(), hours);
execution.setStatus(BatchStatus.FAILED);
execution.setExitStatus(ExitStatus.FAILED.addExitDescription("Forcefully reset: Running for more than 1 hour"));
execution.setEndTime(LocalDateTime.now());
jobRepository.update(execution);
log.info("Job {} 의 이전 실행 상태가 성공적으로 초기화되었습니다.", jobName);
} else {
// 1시간 미만으로 실행 중인 잡이 있다면 진짜 실행 중인 것으로 간주
log.info("Job {} 가 현재 실행 중입니다. (시작 시간: {}, 경과 시간: {}분)",
jobName, startTime, duration.toMinutes());
isActuallyBlocked = true;
}
}
return isActuallyBlocked;
} catch (Exception e) {
log.error("Job 상태 확인 중 오류 발생: {}", e.getMessage());
return false;
}
}
}