Skip to content

Commit

Permalink
Proof of concept support for virtual threads using a mjar
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Apr 15, 2024
1 parent 920a361 commit 5afa1d3
Show file tree
Hide file tree
Showing 32 changed files with 344 additions and 34 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
buildscript {
ext {
palantirGitVersionVersion = "${JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11) ? '0.15.0' : '0.13.0'}"
kotlinVersion = "${project.hasProperty("edgeDepsTest") ? '1.8.20' : (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16) ? '1.5.32' : '1.4.32')}"
kotlinVersion = "${project.hasProperty("edgeDepsTest") ? '1.8.20' : '1.6.20' }"
}
}

plugins {
id 'net.ltgt.errorprone' version '3.0.1' apply false
id 'net.ltgt.errorprone' version '3.1.0' apply false
id 'org.cadixdev.licenser' version '0.6.1'
id 'com.palantir.git-version' version "${palantirGitVersionVersion}" apply false
id 'io.github.gradle-nexus.publish-plugin' version '1.3.0'
id 'com.diffplug.spotless' version '6.18.0' apply false
id 'com.diffplug.spotless' version '6.25.0' apply false
id 'com.github.nbaztec.coveralls-jacoco' version "1.2.15" apply false

// id 'org.jetbrains.kotlin.jvm' version '1.4.32'
Expand Down Expand Up @@ -61,7 +61,7 @@ ext {
// test scoped
// we don't upgrade to 1.3 and 1.4 because they require slf4j 2.x
logbackVersion = project.hasProperty("edgeDepsTest") ? '1.3.5' : '1.2.11'
mockitoVersion = '5.2.0'
mockitoVersion = '5.11.0'
junitVersion = '4.13.2'
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/errorprone.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ subprojects {
dependencies {
errorproneJavac('com.google.errorprone:javac:9+181-r4173-1')
if (JavaVersion.current().isJava11Compatible()) {
errorprone('com.google.errorprone:error_prone_core:2.18.0')
errorprone('com.google.errorprone:error_prone_core:2.26.0')
} else {
errorprone('com.google.errorprone:error_prone_core:2.10.0')
}
Expand Down
14 changes: 7 additions & 7 deletions gradle/java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ subprojects {
apply plugin: 'java-library'

java {
// graal only supports java 8, 11, 16, 17
sourceCompatibility = project.hasProperty("edgeDepsTest") ? JavaVersion.VERSION_17 : JavaVersion.VERSION_1_8
targetCompatibility = project.hasProperty("edgeDepsTest") ? JavaVersion.VERSION_17 : JavaVersion.VERSION_1_8
// graal only supports java 8, 11, 16, 17, 21
sourceCompatibility = project.hasProperty("edgeDepsTest") ? JavaVersion.VERSION_21 : JavaVersion.VERSION_21
targetCompatibility = project.hasProperty("edgeDepsTest") ? JavaVersion.VERSION_21 : JavaVersion.VERSION_21
withJavadocJar()
withSourcesJar()
}
Expand All @@ -24,10 +24,10 @@ subprojects {
compileTestJava {
options.encoding = 'UTF-8'
options.compilerArgs << '-Xlint:none' << '-Xlint:deprecation' << '-Werror' << '-parameters'
if (JavaVersion.current().isJava9Compatible() && !project.hasProperty("edgeDepsTest")) {
// https://stackoverflow.com/a/43103115/525203
options.compilerArgs.addAll(['--release', '8'])
}
// if (JavaVersion.current().isJava9Compatible() && !project.hasProperty("edgeDepsTest")) {
// // https://stackoverflow.com/a/43103115/525203
// options.compilerArgs.addAll(['--release', '8'])
// }
}

javadoc {
Expand Down
2 changes: 1 addition & 1 deletion gradle/linting.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ subprojects {
target 'src/*/java/**/*.java'
targetExclude '**/generated/*'
targetExclude '**/.idea/**'
googleJavaFormat('1.16.0')
googleJavaFormat('1.17.0')
}

kotlin {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class StandardTagNames {
public static final String RUN_ID = "runId";
public static final String PARENT_WORKFLOW_ID = "parentWorkflowId";
public static final String PARENT_RUN_ID = "parentRunId";

/**
* @deprecated use {@link io.opentracing.tag.Tags#ERROR}
*/
Expand Down
28 changes: 28 additions & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,34 @@ dependencies {
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}

if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_21)) {
sourceSets {
java21 {
java {
srcDirs = ['src/main/java21']
}
}
}

dependencies {
java21Implementation files(sourceSets.main.output.classesDirs) { builtBy compileJava }
}

compileJava21Java {
targetCompatibility = JavaVersion.VERSION_21
sourceCompatibility = JavaVersion.VERSION_21
}

jar {
into('META-INF/versions/21') {
from sourceSets.java21.output
}
manifest.attributes(
'Multi-Release': 'true'
)
}
}

task registerNamespace(type: JavaExec) {
getMainClass().set('io.temporal.internal.docker.RegisterTestNamespace')
classpath = sourceSets.test.runtimeClasspath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public interface WorkflowClient {

/** Use this constant as a query type to get a workflow stack trace. */
String QUERY_TYPE_STACK_TRACE = "__stack_trace";

/** Replays workflow to the current state and returns empty result or error if replay failed. */
String QUERY_TYPE_REPLAY_ONLY = "__replay_only";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
public class WorkflowExecutionHistory {
protected static final String DEFAULT_WORKFLOW_ID = "workflow_id_in_replay";
private static final Gson GSON_PRETTY_PRINTER = new GsonBuilder().setPrettyPrinting().create();

// we stay on using the old API that uses a JsonParser instance instead of static methods
// to give users a larger range of supported version
@SuppressWarnings("deprecation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface LocalActivityCallback
class LocalActivityFailedException extends RuntimeException {
private final @Nonnull Failure failure;
private final int lastAttempt;

/**
* If this is not null, code that processes this exception will schedule a workflow timer to
* continue retrying the execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ final class LocalActivityStateMachine

private ExecuteLocalActivityParameters localActivityParameters;
private final Functions.Func<Boolean> replaying;

/** Accepts proposed current time. Returns accepted current time. */
private final Functions.Func1<Long, Long> setCurrentTimeCallback;

Expand All @@ -74,6 +75,7 @@ final class LocalActivityStateMachine

/** Workflow timestamp when the LA state machine is initialized */
private final long workflowTimeMillisWhenStarted;

/**
* System.nanoTime result at the moment of LA state machine initialization. May be used to
* calculate elapsed time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ final class VersionStateMachine {
private final Functions.Proc1<StateMachine> stateMachineSink;

@Nullable private Integer version;

/**
* This variable is used for replay only. When we replay, we look one workflow task ahead and
* preload all version markers to be able to return from Workflow.getVersion called in the event
Expand Down Expand Up @@ -368,6 +369,7 @@ private VersionStateMachine(
this.commandSink = Objects.requireNonNull(commandSink);
this.stateMachineSink = stateMachineSink;
}

/**
* Get the version for this state machine.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private static void popCurrent(CancellationScopeImpl expected) {
private final Runnable runnable;
private CancellationScopeImpl parent;
private final Set<CancellationScopeImpl> children = new HashSet<>();

/**
* When disconnected scope has no parent and thus doesn't receive cancellation requests from it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFa
/** Key: workflow type name, Value: function that creates SyncWorkflowDefinition instance. */
private final Map<String, Functions.Func1<WorkflowExecution, SyncWorkflowDefinition>>
workflowDefinitions = Collections.synchronizedMap(new HashMap<>());

/** Factories providing instances of workflow classes. */
private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
Collections.synchronizedMap(new HashMap<>());

/** If present then it is called for any unknown workflow type. */
private Functions.Func<? extends DynamicWorkflow> dynamicWorkflowImplementationFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.temporal.internal.task;

public interface ThreadConfigurator {
void configure(Thread t);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* Internal delegate for virtual thread handling on JDK 21.
* This is a dummy version for reachability on JDK <21.
*/
public final class VirtualThreadDelegate {

public VirtualThreadDelegate() {
throw new UnsupportedOperationException("Virtual threads not supported on JDK <21");
}

public ThreadFactory virtualThreadFactory() {
throw new UnsupportedOperationException();
}

public ExecutorService newVirtualThreadExecutor(ThreadConfigurator configurator) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class LocalActivityResult {
private final @Nullable RespondActivityTaskCompletedRequest executionCompleted;
private final @Nullable ExecutionFailedResult executionFailed;
private final @Nullable RespondActivityTaskCanceledRequest executionCanceled;

/**
* If present, it will cause an immediate WFT failure instead of providing LA result to the
* workflow code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class WorkflowSerializationContext implements HasWorkflowSerializationContext {
private final @Nonnull String namespace;
private final @Nonnull String workflowId;

// We can't currently reliably and consistency provide workflowType to the DataConverter.
// 1. Signals and queries don't know workflowType when they are sent.
// 2. WorkflowStub#getResult call is not aware of the workflowType, workflowType is an optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.uber.m3.tally.Scope;
import io.temporal.internal.sync.WorkflowThreadExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -34,11 +35,11 @@
* reasons. {@link ThreadPoolExecutor#getActiveCount()} take a pool-wide lock.
*/
class ActiveThreadReportingExecutor implements WorkflowThreadExecutor {
private final ThreadPoolExecutor workflowThreadPool;
private final ExecutorService workflowThreadPool;
private final Scope metricsScope;
private final AtomicInteger tasksInFlight = new AtomicInteger();

ActiveThreadReportingExecutor(ThreadPoolExecutor workflowThreadPool, Scope metricsScope) {
ActiveThreadReportingExecutor(ExecutorService workflowThreadPool, Scope metricsScope) {
this.workflowThreadPool = workflowThreadPool;
this.metricsScope = metricsScope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private MetricsType() {}
TEMPORAL_METRICS_PREFIX + "workflow_task_schedule_to_start_latency";
public static final String WORKFLOW_TASK_EXECUTION_LATENCY =
TEMPORAL_METRICS_PREFIX + "workflow_task_execution_latency";

/** Total latency of a workflow task which can include multiple forced decision tasks */
public static final String WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY =
TEMPORAL_METRICS_PREFIX + "workflow_task_execution_total_latency";
Expand All @@ -64,6 +65,7 @@ private MetricsType() {}
/** Workflow task failed, possibly failing workflow or reporting failure to the service. */
public static final String WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER =
TEMPORAL_METRICS_PREFIX + "workflow_task_execution_failed";

/**
* Workflow task failed with unhandled exception without replying to the service.<br>
* This typically happens when workflow task fails second time in a row.<br>
Expand Down Expand Up @@ -93,6 +95,7 @@ private MetricsType() {}
TEMPORAL_METRICS_PREFIX + "activity_execution_failed";
public static final String ACTIVITY_EXEC_CANCELLED_COUNTER =
TEMPORAL_METRICS_PREFIX + "activity_execution_cancelled";

/**
* @deprecated use {@link #ACTIVITY_EXEC_CANCELLED_COUNTER}
*/
Expand All @@ -113,6 +116,7 @@ private MetricsType() {}

public static final String LOCAL_ACTIVITY_EXEC_CANCELLED_COUNTER =
TEMPORAL_METRICS_PREFIX + "local_activity_execution_cancelled";

/**
* @deprecated use {@link #LOCAL_ACTIVITY_EXEC_CANCELLED_COUNTER}
*/
Expand All @@ -122,6 +126,7 @@ private MetricsType() {}

public static final String LOCAL_ACTIVITY_EXEC_FAILED_COUNTER =
TEMPORAL_METRICS_PREFIX + "local_activity_execution_failed";

/**
* @deprecated use {@link #LOCAL_ACTIVITY_EXEC_FAILED_COUNTER}
*/
Expand All @@ -144,6 +149,7 @@ private MetricsType() {}
public static final String STICKY_CACHE_HIT = TEMPORAL_METRICS_PREFIX + "sticky_cache_hit";
// tagged with namespace, task_queue, worker_type, workflow_type
public static final String STICKY_CACHE_MISS = TEMPORAL_METRICS_PREFIX + "sticky_cache_miss";

// tagged with namespace, task_queue, worker_type, workflow_type
@Deprecated
// This metric in its current form is useless, it's not possible for users to interpret it for any
Expand All @@ -161,6 +167,7 @@ private MetricsType() {}
// Otherwise deprecate it everywhere and remove from docs.
public static final String STICKY_CACHE_TOTAL_FORCED_EVICTION =
TEMPORAL_METRICS_PREFIX + "sticky_cache_total_forced_eviction";

// tagged with namespace, task_queue, worker_type, workflow_type
public static final String STICKY_CACHE_THREAD_FORCED_EVICTION =
TEMPORAL_METRICS_PREFIX + "sticky_cache_thread_forced_eviction";
Expand Down
Loading

0 comments on commit 5afa1d3

Please sign in to comment.