diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 503cc382..2a84f4f4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -32,6 +32,9 @@ import io.serverlessworkflow.impl.scheduler.WorkflowScheduler; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -52,6 +55,7 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData private ScheduledEventConsumer scheculedConsumer; private Cancellable everySchedule; private Cancellable cronSchedule; + private Collection scheduledInstances = new ArrayList<>(); private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { @@ -165,6 +169,14 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor ta executors.put(position.jsonPointer(), taskExecutor); } + public Collection scheduledInstances() { + return Collections.unmodifiableCollection(scheduledInstances); + } + + public void addScheduledInstance(WorkflowInstance workflowInstance) { + scheduledInstances.add(workflowInstance); + } + @Override public WorkflowDefinitionId id() { return definitionId; @@ -181,6 +193,7 @@ public void close() { if (cronSchedule != null) { cronSchedule.cancel(); } + scheduledInstances.clear(); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java index 338fbca7..0df82569 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java @@ -15,31 +15,17 @@ */ package io.serverlessworkflow.impl.scheduler; -import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -public class DefaultWorkflowScheduler implements WorkflowScheduler { +public class DefaultWorkflowScheduler extends ExecutorServiceWorkflowScheduler { - private final Map> instances = - new ConcurrentHashMap<>(); - - private final ScheduledExecutorService service; private final CronResolverFactory cronFactory; public DefaultWorkflowScheduler() { @@ -48,50 +34,15 @@ public DefaultWorkflowScheduler() { public DefaultWorkflowScheduler( ScheduledExecutorService service, CronResolverFactory cronFactory) { - this.service = service; + super(service); this.cronFactory = cronFactory; } - @Override - public Collection scheduledInstances(WorkflowDefinition definition) { - return Collections.unmodifiableCollection(theInstances(definition)); - } - - @Override - public ScheduledEventConsumer eventConsumer( - WorkflowDefinition definition, - Function converter, - EventRegistrationBuilderInfo builderInfo) { - return new ScheduledEventConsumer( - definition, converter, builderInfo, new DefaultScheduledInstanceRunner(definition)); - } - - @Override - public Cancellable scheduleAfter(WorkflowDefinition definition, Duration delay) { - return new ScheduledServiceCancellable( - service.schedule( - new DefaultScheduledInstanceRunner(definition), - delay.toMillis(), - TimeUnit.MILLISECONDS)); - } - - @Override - public Cancellable scheduleEvery(WorkflowDefinition definition, Duration interval) { - long delay = interval.toMillis(); - return new ScheduledServiceCancellable( - service.scheduleAtFixedRate( - new DefaultScheduledInstanceRunner(definition), delay, delay, TimeUnit.MILLISECONDS)); - } - @Override public Cancellable scheduleCron(WorkflowDefinition definition, String cron) { return new CronResolverCancellable(definition, cronFactory.parseCron(cron)); } - private Collection theInstances(WorkflowDefinition definition) { - return instances.computeIfAbsent(definition, def -> new ArrayList<>()); - } - private class CronResolverCancellable implements Cancellable { private final WorkflowDefinition definition; private final CronResolver cronResolver; @@ -126,7 +77,7 @@ public void cancel() { } } - private class CronResolverIntanceRunner extends DefaultScheduledInstanceRunner { + private class CronResolverIntanceRunner extends ScheduledInstanceRunnable { protected CronResolverIntanceRunner(WorkflowDefinition definition) { super(definition); } @@ -140,15 +91,4 @@ public void accept(WorkflowModel model) { } } } - - private class DefaultScheduledInstanceRunner extends ScheduledInstanceRunnable { - protected DefaultScheduledInstanceRunner(WorkflowDefinition definition) { - super(definition); - } - - @Override - protected void addScheduledInstance(WorkflowInstance instance) { - theInstances(definition).add(instance); - } - } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/EventWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/EventWorkflowScheduler.java new file mode 100644 index 00000000..a509b5f3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/EventWorkflowScheduler.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import java.util.function.Function; + +public abstract class EventWorkflowScheduler implements WorkflowScheduler { + + @Override + public ScheduledEventConsumer eventConsumer( + WorkflowDefinition definition, + Function converter, + EventRegistrationBuilderInfo builderInfo) { + return new ScheduledEventConsumer( + definition, converter, builderInfo, new ScheduledInstanceRunnable(definition)); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ExecutorServiceWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ExecutorServiceWorkflowScheduler.java new file mode 100644 index 00000000..5115f5f5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ExecutorServiceWorkflowScheduler.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public abstract class ExecutorServiceWorkflowScheduler extends EventWorkflowScheduler { + + protected final ScheduledExecutorService service; + + public ExecutorServiceWorkflowScheduler(ScheduledExecutorService service) { + this.service = service; + } + + @Override + public ScheduledEventConsumer eventConsumer( + WorkflowDefinition definition, + Function converter, + EventRegistrationBuilderInfo builderInfo) { + return new ScheduledEventConsumer( + definition, converter, builderInfo, new ScheduledInstanceRunnable(definition)); + } + + @Override + public Cancellable scheduleEvery(WorkflowDefinition definition, Duration interval) { + long delay = interval.toMillis(); + return new ScheduledServiceCancellable( + service.scheduleAtFixedRate( + new ScheduledInstanceRunnable(definition), delay, delay, TimeUnit.MILLISECONDS)); + } + + @Override + public Cancellable scheduleAfter(WorkflowDefinition definition, Duration delay) { + return new ScheduledServiceCancellable( + service.schedule( + new ScheduledInstanceRunnable(definition), delay.toMillis(), TimeUnit.MILLISECONDS)); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java index 76e6ccb9..01f3f6f7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -40,7 +40,7 @@ public class ScheduledEventConsumer implements AutoCloseable { private Map> correlatedEvents; private Collection registrations = new ArrayList<>(); - protected ScheduledEventConsumer( + public ScheduledEventConsumer( WorkflowDefinition definition, Function converter, EventRegistrationBuilderInfo builderInfo, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java index cdb98e58..b767aedf 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java @@ -20,11 +20,11 @@ import io.serverlessworkflow.impl.WorkflowModel; import java.util.function.Consumer; -public abstract class ScheduledInstanceRunnable implements Runnable, Consumer { +public class ScheduledInstanceRunnable implements Runnable, Consumer { protected final WorkflowDefinition definition; - protected ScheduledInstanceRunnable(WorkflowDefinition definition) { + public ScheduledInstanceRunnable(WorkflowDefinition definition) { this.definition = definition; } @@ -36,9 +36,7 @@ public void run() { @Override public void accept(WorkflowModel model) { WorkflowInstance instance = definition.instance(model); - addScheduledInstance(instance); + definition.addScheduledInstance(instance); definition.application().executorService().execute(() -> instance.start()); } - - protected abstract void addScheduledInstance(WorkflowInstance instance); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java index 2341f910..45a8d01d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledServiceCancellable.java @@ -17,7 +17,7 @@ import java.util.concurrent.ScheduledFuture; -class ScheduledServiceCancellable implements Cancellable { +public class ScheduledServiceCancellable implements Cancellable { private final ScheduledFuture cancellable; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java index 27059f1a..55707ead 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/WorkflowScheduler.java @@ -17,15 +17,12 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; import java.time.Duration; -import java.util.Collection; import java.util.function.Function; public interface WorkflowScheduler { - Collection scheduledInstances(WorkflowDefinition def); ScheduledEventConsumer eventConsumer( WorkflowDefinition definition, diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java index 5e0ba36f..30736f1f 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/ScheduleEventConsumerTest.java @@ -57,7 +57,7 @@ void testAllEvent() throws IOException, InterruptedException, ExecutionException WorkflowDefinition definition = appl.workflowDefinition( readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml")); - Collection instances = appl.scheduler().scheduledInstances(definition); + Collection instances = definition.scheduledInstances(); appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito")))); appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); await() @@ -77,7 +77,7 @@ void testOneEvent() throws IOException, InterruptedException, ExecutionException appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/listen-start.yaml")); appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito")))); appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); - Collection instances = appl.scheduler().scheduledInstances(definition); + Collection instances = definition.scheduledInstances(); await() .pollDelay(Duration.ofMillis(20)) .atMost(Duration.ofMillis(600)) @@ -100,8 +100,8 @@ void testTogether() throws IOException, InterruptedException, ExecutionException readWorkflowFromClasspath("workflows-samples/listen-start-all.yaml")); appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Javierito")))); appl.eventPublishers().forEach(p -> p.publish(buildCloudEvent(Map.of("name", "Fulanito")))); - Collection oneDefInstances = appl.scheduler().scheduledInstances(oneDef); - Collection allDefInstances = appl.scheduler().scheduledInstances(allDef); + Collection oneDefInstances = oneDef.scheduledInstances(); + Collection allDefInstances = allDef.scheduledInstances(); await() .pollDelay(Duration.ofMillis(40)) .atMost(Duration.ofMillis(980)) diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java index 43c19644..8b398963 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/SchedulerTest.java @@ -49,11 +49,11 @@ void testAfter() throws IOException, InterruptedException, ExecutionException { try (WorkflowDefinition def = appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/after-start.yaml"))) { def.instance(Map.of()).start().join(); - assertThat(appl.scheduler().scheduledInstances(def)).isEmpty(); + assertThat(def.scheduledInstances()).isEmpty(); await() .pollDelay(Duration.ofMillis(50)) .atMost(Duration.ofMillis(200)) - .until(() -> appl.scheduler().scheduledInstances(def).size() >= 1); + .until(() -> def.scheduledInstances().size() == 1); } } @@ -64,7 +64,7 @@ void testEvery() throws IOException, InterruptedException, ExecutionException { await() .pollDelay(Duration.ofMillis(20)) .atMost(Duration.ofMillis(200)) - .until(() -> appl.scheduler().scheduledInstances(def).size() >= 5); + .until(() -> def.scheduledInstances().size() >= 5); } } @@ -75,10 +75,10 @@ void testCron() throws IOException, InterruptedException, ExecutionException { appl.workflowDefinition(readWorkflowFromClasspath("workflows-samples/cron-start.yaml"))) { await() .atMost(Duration.ofMinutes(1).plus(Duration.ofSeconds(10))) - .until(() -> appl.scheduler().scheduledInstances(def).size() == 1); + .until(() -> def.scheduledInstances().size() == 1); await() .atMost(Duration.ofMinutes(1).plus(Duration.ofSeconds(10))) - .until(() -> appl.scheduler().scheduledInstances(def).size() == 2); + .until(() -> def.scheduledInstances().size() == 2); } } }