diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java b/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java index ec3767d6..0b0cb9f7 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ProtocolException.java @@ -20,17 +20,9 @@ public class ProtocolException extends RuntimeException { public static final int INTERNAL_CODE = 500; public static final int JOURNAL_MISMATCH_CODE = 570; static final int PROTOCOL_VIOLATION_CODE = 571; - static final int CLOSED_CODE = 598; - - @SuppressWarnings("StaticAssignmentOfThrowable") - static final ProtocolException CLOSED = new ProtocolException("Invocation closed", CLOSED_CODE); private final int code; - private ProtocolException(String message) { - this(message, TerminalException.INTERNAL_SERVER_ERROR_CODE); - } - public ProtocolException(String message, int code) { this(message, code, null); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/CommandType.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/CommandType.java index 28e7808e..c983bed6 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/CommandType.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/CommandType.java @@ -32,46 +32,25 @@ enum CommandType { /** Convert a CommandType to a MessageType. */ public MessageType toMessageType() { - switch (this) { - case INPUT: - return MessageType.InputCommandMessage; - case OUTPUT: - return MessageType.OutputCommandMessage; - case GET_STATE: - return MessageType.GetLazyStateCommandMessage; - case GET_STATE_KEYS: - return MessageType.GetLazyStateKeysCommandMessage; - case SET_STATE: - return MessageType.SetStateCommandMessage; - case CLEAR_STATE: - return MessageType.ClearStateCommandMessage; - case CLEAR_ALL_STATE: - return MessageType.ClearAllStateCommandMessage; - case GET_PROMISE: - return MessageType.GetPromiseCommandMessage; - case PEEK_PROMISE: - return MessageType.PeekPromiseCommandMessage; - case COMPLETE_PROMISE: - return MessageType.CompletePromiseCommandMessage; - case SLEEP: - return MessageType.SleepCommandMessage; - case CALL: - return MessageType.CallCommandMessage; - case ONE_WAY_CALL: - return MessageType.OneWayCallCommandMessage; - case SEND_SIGNAL: - case CANCEL_INVOCATION: - return MessageType.SendSignalCommandMessage; - case RUN: - return MessageType.RunCommandMessage; - case ATTACH_INVOCATION: - return MessageType.AttachInvocationCommandMessage; - case GET_INVOCATION_OUTPUT: - return MessageType.GetInvocationOutputCommandMessage; - case COMPLETE_AWAKEABLE: - return MessageType.CompleteAwakeableCommandMessage; - default: - throw new IllegalStateException("Unexpected command type: " + this); - } + return switch (this) { + case INPUT -> MessageType.InputCommandMessage; + case OUTPUT -> MessageType.OutputCommandMessage; + case GET_STATE -> MessageType.GetLazyStateCommandMessage; + case GET_STATE_KEYS -> MessageType.GetLazyStateKeysCommandMessage; + case SET_STATE -> MessageType.SetStateCommandMessage; + case CLEAR_STATE -> MessageType.ClearStateCommandMessage; + case CLEAR_ALL_STATE -> MessageType.ClearAllStateCommandMessage; + case GET_PROMISE -> MessageType.GetPromiseCommandMessage; + case PEEK_PROMISE -> MessageType.PeekPromiseCommandMessage; + case COMPLETE_PROMISE -> MessageType.CompletePromiseCommandMessage; + case SLEEP -> MessageType.SleepCommandMessage; + case CALL -> MessageType.CallCommandMessage; + case ONE_WAY_CALL -> MessageType.OneWayCallCommandMessage; + case SEND_SIGNAL, CANCEL_INVOCATION -> MessageType.SendSignalCommandMessage; + case RUN -> MessageType.RunCommandMessage; + case ATTACH_INVOCATION -> MessageType.AttachInvocationCommandMessage; + case GET_INVOCATION_OUTPUT -> MessageType.GetInvocationOutputCommandMessage; + case COMPLETE_AWAKEABLE -> MessageType.CompleteAwakeableCommandMessage; + }; } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/EntryHeaderChecker.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/EntryHeaderChecker.java index c83f37e6..6f437e6c 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/EntryHeaderChecker.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/EntryHeaderChecker.java @@ -47,13 +47,11 @@ public static EntryHeaderChecker check( throw new ProtocolException( "Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.\n" + "This typically happens when some parts of the code are non-deterministic.\n" - + "- The mismatch happened with message types not matching\n" - + "- Difference:\n" - + " Message types don't match: expected " - + expectedClass.getSimpleName() - + ", actual " - + actual.getClass().getSimpleName() - + "\n", + + "- Expecting command '" + + Util.commandMessageToString(expected) + + "' but was '" + + Util.commandMessageToString(actual) + + "'", JOURNAL_MISMATCH_CODE); } return new EntryHeaderChecker<>(expected, (E) actual); @@ -98,18 +96,18 @@ private ProtocolException createMismatchException() { "Found a mismatch between the code paths taken during the previous execution and the paths taken during this execution.\n" + "This typically happens when some parts of the code are non-deterministic.\n" + "- The mismatch happened while executing '" - + expected.getClass().getSimpleName() + + Util.commandMessageToString(expected) + "'\n" - + "- Difference:\n"); + + "- Difference:"); for (FieldMismatch mismatch : mismatches) { customMessage - .append(" ") + .append("\n ") .append(mismatch.fieldName) .append(": '") .append(mismatch.expectedValue) .append("' != '") .append(mismatch.actualValue) - .append("'\n"); + .append("'"); } return new ProtocolException(customMessage.toString(), JOURNAL_MISMATCH_CODE); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java index e1ea863e..9d1083bd 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/Util.java @@ -9,17 +9,14 @@ package dev.restate.sdk.core.statemachine; import com.google.protobuf.ByteString; +import com.google.protobuf.MessageLite; import com.google.protobuf.UnsafeByteOperations; import dev.restate.common.Slice; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.core.ProtocolException; import dev.restate.sdk.core.generated.protocol.Protocol; -import java.io.PrintWriter; -import java.io.StringWriter; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Objects; -import org.jspecify.annotations.Nullable; public class Util { @@ -38,46 +35,6 @@ static Protocol.Failure toProtocolFailure(Throwable throwable) { return toProtocolFailure(TerminalException.INTERNAL_SERVER_ERROR_CODE, throwable.toString()); } - static Protocol.ErrorMessage toErrorMessage( - Throwable throwable, - int currentCommandIndex, - @Nullable String currentCommandName, - @Nullable MessageType currentCommandType) { - Protocol.ErrorMessage.Builder msg = Protocol.ErrorMessage.newBuilder(); - - if (throwable.getMessage() == null) { - // This happens only with few common exceptions, but anyway - msg.setMessage(throwable.toString()); - } else { - msg.setMessage(throwable.getMessage()); - } - - if (throwable instanceof ProtocolException) { - msg.setCode(((ProtocolException) throwable).getCode()); - } else { - msg.setCode(TerminalException.INTERNAL_SERVER_ERROR_CODE); - } - - // Convert stacktrace to string - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - throwable.printStackTrace(pw); - msg.setStacktrace(sw.toString()); - - // Add journal entry info - if (currentCommandIndex >= 0) { - msg.setRelatedCommandIndex(currentCommandIndex); - } - if (currentCommandName != null) { - msg.setRelatedCommandName(currentCommandName); - } - if (currentCommandType != null) { - msg.setRelatedCommandType(currentCommandType.encode()); - } - - return msg.build(); - } - static TerminalException toRestateException(Protocol.Failure failure) { return new TerminalException(failure.getCode(), failure.getMessage()); } @@ -100,6 +57,54 @@ static Duration durationMin(Duration a, Duration b) { return (a.compareTo(b) <= 0) ? a : b; } + /** + * Returns a string representation of a command message. + * + * @param message The command message + * @return A string representation of the command message + */ + static String commandMessageToString(MessageLite message) { + if (message instanceof Protocol.InputCommandMessage) { + return "handler input"; + } else if (message instanceof Protocol.OutputCommandMessage) { + return "handler return"; + } else if (message instanceof Protocol.GetLazyStateCommandMessage) { + return "get state"; + } else if (message instanceof Protocol.GetLazyStateKeysCommandMessage) { + return "get state keys"; + } else if (message instanceof Protocol.SetStateCommandMessage) { + return "set state"; + } else if (message instanceof Protocol.ClearStateCommandMessage) { + return "clear state"; + } else if (message instanceof Protocol.ClearAllStateCommandMessage) { + return "clear all state"; + } else if (message instanceof Protocol.GetPromiseCommandMessage) { + return "get promise"; + } else if (message instanceof Protocol.PeekPromiseCommandMessage) { + return "peek promise"; + } else if (message instanceof Protocol.CompletePromiseCommandMessage) { + return "complete promise"; + } else if (message instanceof Protocol.SleepCommandMessage) { + return "sleep"; + } else if (message instanceof Protocol.CallCommandMessage) { + return "call"; + } else if (message instanceof Protocol.OneWayCallCommandMessage) { + return "one way call/send"; + } else if (message instanceof Protocol.SendSignalCommandMessage) { + return "send signal"; + } else if (message instanceof Protocol.RunCommandMessage) { + return "run"; + } else if (message instanceof Protocol.AttachInvocationCommandMessage) { + return "attach invocation"; + } else if (message instanceof Protocol.GetInvocationOutputCommandMessage) { + return "get invocation output"; + } else if (message instanceof Protocol.CompleteAwakeableCommandMessage) { + return "complete awakeable"; + } + + return message.getClass().getSimpleName(); + } + private static final class ByteStringSlice implements Slice { private final ByteString byteString;