Testing

Module info

To use Pekko Persistence TestKit, add the module to your project:

sbt
val PekkoVersion = "1.1.2"
libraryDependencies ++= Seq(
  "org.apache.pekko" %% "pekko-persistence-typed" % PekkoVersion,
  "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion % Test
)
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-persistence-typed_${scala.binary.version}</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId>
    <scope>test</scope>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2")

  implementation "org.apache.pekko:pekko-persistence-typed_${versions.ScalaBinary}"
  testImplementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}"
}
Project Info: Pekko Persistence Testkit
Artifact
org.apache.pekko
pekko-persistence-testkit
1.1.2
JDK versions
OpenJDK 8
OpenJDK 11
OpenJDK 17
OpenJDK 21
Scala versions2.13.14, 2.12.20, 3.3.4
JPMS module namepekko.persistence.testkit
License
Home pagehttps://pekko.apache.org/
API documentation
Forums
Release notesRelease Notes
IssuesGithub issues
Sourceshttps://github.com/apache/pekko

Unit testing

Note! The EventSourcedBehaviorTestKit is a new feature, api may have changes breaking source compatibility in future versions.

Unit testing of EventSourcedBehavior can be done with the EventSourcedBehaviorTestKitEventSourcedBehaviorTestKit. It supports running one command at a time and you can assert that the synchronously returned result is as expected. The result contains the events emitted by the command and the new state after applying the events. It also has support for verifying the reply to a command.

You need to configure the ActorSystem with the EventSourcedBehaviorTestKit.config. The configuration enables the in-memory journal and snapshot storage.

Scala
sourceclass AccountExampleDocSpec
    extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
Java
source@ClassRule
public static final TestKitJunitResource testKit =
    new TestKitJunitResource(EventSourcedBehaviorTestKit.config());

private EventSourcedBehaviorTestKit<
        AccountEntity.Command, AccountEntity.Event, AccountEntity.Account>
    eventSourcedTestKit =
        EventSourcedBehaviorTestKit.create(
            testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));

A full test for the AccountEntity, which is shown in the Persistence Style Guide, may look like this:

Scala
sourceimport org.apache.pekko
import pekko.Done
import pekko.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import pekko.persistence.typed.PersistenceId
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.pattern.StatusReply
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike

class AccountExampleDocSpec
    extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
    with AnyWordSpecLike
    with BeforeAndAfterEach
    with LogCapturing {

  private val eventSourcedTestKit =
    EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account](
      system,
      AccountEntity("1", PersistenceId("Account", "1")))

  override protected def beforeEach(): Unit = {
    super.beforeEach()
    eventSourcedTestKit.clear()
  }

  "Account" must {

    "be created with zero balance" in {
      val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
      result.reply shouldBe StatusReply.Ack
      result.event shouldBe AccountEntity.AccountCreated
      result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0
    }

    "handle Withdraw" in {
      eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))

      val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
      result1.reply shouldBe StatusReply.Ack
      result1.event shouldBe AccountEntity.Deposited(100)
      result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100

      val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _))
      result2.reply shouldBe StatusReply.Ack
      result2.event shouldBe AccountEntity.Withdrawn(10)
      result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90
    }

    "reject Withdraw overdraft" in {
      eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
      eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))

      val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _))
      result.reply.isError shouldBe true
      result.hasNoEvents shouldBe true
    }

    "handle GetBalance" in {
      eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
      eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))

      val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_))
      result.reply.balance shouldBe 100
      result.hasNoEvents shouldBe true
    }
  }
}
Java
sourceimport java.math.BigDecimal;
import org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing;
import org.apache.pekko.actor.testkit.typed.javadsl.TestKitJunitResource;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit;
import org.apache.pekko.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply;
import org.apache.pekko.persistence.typed.PersistenceId;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class AccountExampleDocTest
{

  @ClassRule
  public static final TestKitJunitResource testKit =
      new TestKitJunitResource(EventSourcedBehaviorTestKit.config());

  private EventSourcedBehaviorTestKit<
          AccountEntity.Command, AccountEntity.Event, AccountEntity.Account>
      eventSourcedTestKit =
          EventSourcedBehaviorTestKit.create(
              testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));

  @Rule public final LogCapturing logCapturing = new LogCapturing();

  @Before
  public void beforeEach() {
    eventSourcedTestKit.clear();
  }

  @Test
  public void createWithEmptyBalance() {
    CommandResultWithReply<
            AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
        result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
    assertEquals(StatusReply.ack(), result.reply());
    assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event());
    assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance);
  }

  @Test
  public void createWithUnHandle() {
    CommandResultWithReply<
            AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
        result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
    assertFalse(result.hasNoReply());
  }

  @Test
  public void handleWithdraw() {
    eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);

    CommandResultWithReply<
            AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
        result1 =
            eventSourcedTestKit.runCommand(
                replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
    assertEquals(StatusReply.ack(), result1.reply());
    assertEquals(
        BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount);
    assertEquals(
        BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance);

    CommandResultWithReply<
            AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
        result2 =
            eventSourcedTestKit.runCommand(
                replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo));
    assertEquals(StatusReply.ack(), result2.reply());
    assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount);
    assertEquals(
        BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance);
  }

  @Test
  public void rejectWithdrawOverdraft() {
    eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
    eventSourcedTestKit.runCommand(
        (ActorRef<StatusReply<Done>> replyTo) ->
            new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));

    CommandResultWithReply<
            AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
        result =
            eventSourcedTestKit.runCommand(
                replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo));
    assertTrue(result.reply().isError());
    assertTrue(result.hasNoEvents());
  }

  @Test
  public void handleGetBalance() {
    eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
    eventSourcedTestKit.runCommand(
        (ActorRef<StatusReply<Done>> replyTo) ->
            new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));

    CommandResultWithReply<
            AccountEntity.Command,
            AccountEntity.Event,
            AccountEntity.Account,
            AccountEntity.CurrentBalance>
        result = eventSourcedTestKit.runCommand(AccountEntity.GetBalance::new);
    assertEquals(BigDecimal.valueOf(100), result.reply().balance);
  }
}

Serialization of commands, events and state are verified automatically. The serialization checks can be customized with the SerializationSettings when creating the EventSourcedBehaviorTestKit. By default, the serialization roundtrip is checked but the equality of the result of the serialization is not checked. equals must be implemented (or using case class) in the commands, events and state if verifyEquality is enabled.

To test recovery the restart method of the EventSourcedBehaviorTestKit can be used. It will restart the behavior, which will then recover from stored snapshot and events from previous commands. It’s also possible to populate the storage with events or simulate failures by using the underlying PersistenceTestKitPersistenceTestKit.

Persistence TestKit

Note! The PersistenceTestKit is a new feature, api may have changes breaking source compatibility in future versions.

Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project:

sbt
val PekkoVersion = "1.1.2"
libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2")

  implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}"
}

There are two testkit classes which have similar api:

The testkit classes have two corresponding plugins which emulate the behavior of the storages:

Note! The corresponding plugins must be configured in the actor system which is used to initialize the particular testkit class:

Scala
source
val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem(??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration)) val testKit = PersistenceTestKit(system)
Java
sourcepublic class PersistenceTestKitConfig {

  Config conf =
      PersistenceTestKitPlugin.getInstance()
          .config()
          .withFallback(ConfigFactory.defaultApplication());

  ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf);

  PersistenceTestKit testKit = PersistenceTestKit.create(system);
}

and

Scala
source
val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration)) val testKit = SnapshotTestKit(system)
Java
sourcepublic class SnapshotTestKitConfig {

  Config conf =
      PersistenceTestKitSnapshotPlugin.getInstance()
          .config()
          .withFallback(ConfigFactory.defaultApplication());

  ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf);

  SnapshotTestKit testKit = SnapshotTestKit.create(system);
}

A typical scenario is to create a persistent actor, send commands to it and check that it persists events as it is expected:

Scala
sourceimport org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.scaladsl.PersistenceTestKit

class PersistenceTestKitSampleSpec
    extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
    with AnyWordSpecLike
    with BeforeAndAfterEach {

  val persistenceTestKit = PersistenceTestKit(system)

  override def beforeEach(): Unit = {
    persistenceTestKit.clearAll()
  }

  "Persistent actor" should {

    "persist all events" in {

      val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
      val persistentActor = spawn(
        EventSourcedBehavior[Cmd, Evt, State](
          persistenceId,
          emptyState = State.empty,
          commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
          eventHandler = (state, evt) => state.updated(evt)))
      val cmd = Cmd("data")

      persistentActor ! cmd

      val expectedPersistedEvent = Evt(cmd.data)
      persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent)
    }

  }
}
Java
sourcepublic class PersistenceTestKitSampleTest extends AbstractJavaTest {

  @ClassRule
  public static final TestKitJunitResource testKit =
      new TestKitJunitResource(
          PersistenceTestKitPlugin.getInstance()
              .config()
              .withFallback(ConfigFactory.defaultApplication()));

  PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system());

  @Before
  public void beforeEach() {
    persistenceTestKit.clearAll();
  }

  @Test
  public void test() {
    PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id");
    ActorRef<YourPersistentBehavior.Cmd> ref =
        testKit.spawn(YourPersistentBehavior.create(persistenceId));

    YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data");
    ref.tell(cmd);
    YourPersistentBehavior.Evt expectedEventPersisted = new YourPersistentBehavior.Evt(cmd.data);

    persistenceTestKit.expectNextPersisted(persistenceId.id(), expectedEventPersisted);
  }
}

class YourPersistentBehavior
    extends EventSourcedBehavior<
        YourPersistentBehavior.Cmd, YourPersistentBehavior.Evt, YourPersistentBehavior.State> {

  static final class Cmd implements CborSerializable {

    public final String data;

    @JsonCreator
    public Cmd(String data) {
      this.data = data;
    }
  }

  static final class Evt implements CborSerializable {

    public final String data;

    @JsonCreator
    public Evt(String data) {
      this.data = data;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;

      Evt evt = (Evt) o;

      return data.equals(evt.data);
    }

    @Override
    public int hashCode() {
      return data.hashCode();
    }
  }

  static final class State implements CborSerializable {}

  static Behavior<Cmd> create(PersistenceId persistenceId) {
    return Behaviors.setup(context -> new YourPersistentBehavior(persistenceId));
  }

  private YourPersistentBehavior(PersistenceId persistenceId) {
    super(persistenceId);
  }

  @Override
  public State emptyState() {
    // some state
    return new State();
  }

  @Override
  public CommandHandler<Cmd, Evt, State> commandHandler() {
    return newCommandHandlerBuilder()
        .forAnyState()
        .onCommand(Cmd.class, command -> Effect().persist(new Evt(command.data)))
        .build();
  }

  @Override
  public EventHandler<State, Evt> eventHandler() {
    // TODO handle events
    return newEventHandlerBuilder().forAnyState().onEvent(Evt.class, (state, evt) -> state).build();
  }
}

You can safely use persistence testkit in combination with main pekko testkit.

The main methods of the api allow to (see PersistenceTestKitPersistenceTestKit and SnapshotTestKitSnapshotTestKit for more details):

  • check if the given event/snapshot object is the next persisted in the storage.
  • read a sequence of persisted events/snapshots.
  • check that no events/snapshots have been persisted in the storage.
  • throw the default exception from the storage on attempt to persist, read or delete the following event/snapshot.
  • clear the events/snapshots persisted in the storage.
  • reject the events, but not snapshots (rejections are not supported for snapshots in the original api).
  • set your own policy which emulates the work of the storage. Policy determines what to do when persistence needs to execute some operation on the storage (i.e. read, delete, etc.).
  • get all the events/snapshots persisted in the storage
  • put the events/snapshots in the storage to test recovery

Setting your own policy for the storage

You can implement and set your own policy for the storage to control its actions on particular operations, for example you can fail or reject events on your own conditions. Implement the ProcessingPolicy[EventStorage.JournalOperation]ProcessingPolicy<EventStorage.JournalOperation> traitinterface for event storage or ProcessingPolicy[SnapshotStorage.SnapshotOperation]ProcessingPolicy<SnapshotStorage.SnapshotOperation> traitinterface for snapshot storage, and set it with withPolicy() method.

Scala
sourceclass PersistenceTestKitSampleSpecWithPolicy
    extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
    with AnyWordSpecLike
    with BeforeAndAfterEach {

  val persistenceTestKit = PersistenceTestKit(system)

  override def beforeEach(): Unit = {
    persistenceTestKit.clearAll()
    persistenceTestKit.resetPolicy()
  }

  "Testkit policy" should {

    "fail all operations with custom exception" in {
      val policy = new EventStorage.JournalPolicies.PolicyType {

        class CustomFailure extends RuntimeException

        override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
          processingUnit match {
            case WriteEvents(_) => StorageFailure(new CustomFailure)
            case _              => ProcessingSuccess
          }
      }
      persistenceTestKit.withPolicy(policy)

      val persistenceId = PersistenceId.ofUniqueId("your-persistence-id")
      val persistentActor = spawn(
        EventSourcedBehavior[Cmd, Evt, State](
          persistenceId,
          emptyState = State.empty,
          commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
          eventHandler = (state, evt) => state.updated(evt)))

      persistentActor ! Cmd("data")
      persistenceTestKit.expectNothingPersisted(persistenceId.id)

    }
  }
}
Java
sourcepublic class PersistenceTestKitPolicySampleTest extends AbstractJavaTest {

  @ClassRule
  public static final TestKitJunitResource testKit =
      new TestKitJunitResource(
          PersistenceTestKitPlugin.getInstance()
              .config()
              .withFallback(ConfigFactory.defaultApplication()));

  PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system());

  @Before
  public void beforeEach() {
    persistenceTestKit.clearAll();
    persistenceTestKit.resetPolicy();
  }

  @Test
  public void test() {
    SampleEventStoragePolicy policy = new SampleEventStoragePolicy();
    persistenceTestKit.withPolicy(policy);

    PersistenceId persistenceId = PersistenceId.ofUniqueId("some-id");
    ActorRef<YourPersistentBehavior.Cmd> ref =
        testKit.spawn(YourPersistentBehavior.create(persistenceId));

    YourPersistentBehavior.Cmd cmd = new YourPersistentBehavior.Cmd("data");
    ref.tell(cmd);

    persistenceTestKit.expectNothingPersisted(persistenceId.id());
  }

  static class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> {
    @Override
    public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) {
      if (processingUnit instanceof WriteEvents) {
        return StorageFailure.create();
      } else {
        return ProcessingSuccess.getInstance();
      }
    }
  }
}

tryProcess() method of the ProcessingPolicyProcessingPolicy has two arguments: persistence id and the storage operation.

Event storage has the following operations:

Snapshot storage has the following operations:

The tryProcess() method must return one of the processing results:

Note that snapshot storage does not have rejections. If you return Reject in the tryProcess() of the snapshot storage policy, it will have the same effect as the StorageFailure.

Here is an example of the policy for an event storage:

Scala
sourceimport org.apache.pekko.persistence.testkit._

class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType {

  // you can use internal state, it does not need to be thread safe
  var count = 1

  override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
    if (count < 10) {
      count += 1
      // check the type of operation and react with success or with reject or with failure.
      // if you return ProcessingSuccess the operation will be performed, otherwise not.
      processingUnit match {
        case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess
        case WriteEvents(batch) if batch.size > 1 =>
          ProcessingSuccess
        case ReadSeqNum      => StorageFailure()
        case DeleteEvents(_) => Reject()
        case _               => StorageFailure()
      }
    } else {
      ProcessingSuccess
    }

}
Java
sourceclass SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> {

  // you can use internal state, it does not need to be thread safe
  int count = 1;

  @Override
  public ProcessingResult tryProcess(String processId, JournalOperation processingUnit) {
    // check the type of operation and react with success or with reject or with failure.
    // if you return ProcessingSuccess the operation will be performed, otherwise not.
    if (count < 10) {
      count += 1;
      if (processingUnit instanceof ReadEvents) {
        ReadEvents read = (ReadEvents) processingUnit;
        if (read.batch().nonEmpty()) {
          ProcessingSuccess.getInstance();
        } else {
          return StorageFailure.create();
        }
      } else if (processingUnit instanceof WriteEvents) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit instanceof DeleteEvents) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit.equals(ReadSeqNum.getInstance())) {
        return Reject.create();
      }
      // you can set your own exception
      return StorageFailure.create(new RuntimeException("your exception"));
    } else {
      return ProcessingSuccess.getInstance();
    }
  }
}

Here is an example of the policy for a snapshot storage:

Scala
sourceclass SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {

  // you can use internal state, it does not need to be thread safe
  var count = 1

  override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult =
    if (count < 10) {
      count += 1
      // check the type of operation and react with success or with reject or with failure.
      // if you return ProcessingSuccess the operation will be performed, otherwise not.
      processingUnit match {
        case ReadSnapshot(_, payload) if payload.nonEmpty =>
          ProcessingSuccess
        case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 =>
          ProcessingSuccess
        case DeleteSnapshotsByCriteria(_) => StorageFailure()
        case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 =>
          ProcessingSuccess
        case _ => StorageFailure()
      }
    } else {
      ProcessingSuccess
    }
}
Java
sourceclass SnapshotStoragePolicy implements ProcessingPolicy<SnapshotOperation> {

  // you can use internal state, it doesn't need to be thread safe
  int count = 1;

  @Override
  public ProcessingResult tryProcess(String processId, SnapshotOperation processingUnit) {
    // check the type of operation and react with success or with failure.
    // if you return ProcessingSuccess the operation will be performed, otherwise not.
    if (count < 10) {
      count += 1;
      if (processingUnit instanceof ReadSnapshot) {
        ReadSnapshot read = (ReadSnapshot) processingUnit;
        if (read.getSnapshot().isPresent()) {
          ProcessingSuccess.getInstance();
        } else {
          return StorageFailure.create();
        }
      } else if (processingUnit instanceof WriteSnapshot) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit instanceof DeleteSnapshotsByCriteria) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit instanceof DeleteSnapshotByMeta) {
        return ProcessingSuccess.getInstance();
      }
      // you can set your own exception
      return StorageFailure.create(new RuntimeException("your exception"));
    } else {
      return ProcessingSuccess.getInstance();
    }
  }
}

Configuration of Persistence TestKit

There are several configuration properties for persistence testkit, please refer to the reference configuration

Integration testing

EventSourcedBehavior actors can be tested with the ActorTestKit together with other actors. The in-memory journal and snapshot storage from the Persistence TestKit can be used also for integration style testing of a single ActorSystem, for example when using Cluster Sharding with a single Cluster node.

For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it’s possible to use the Persistence Plugin Proxy it’s often better and more realistic to use a real database.

Plugin initialization

Some Persistence plugins create tables automatically, but has the limitation that it can’t be done concurrently from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize the plugins at the same time. To coordinate initialization you can use the PersistenceInit utility.

PersistenceInit is part of pekko-persistence-testkit and you need to add the dependency to your project:

sbt
val PekkoVersion = "1.1.2"
libraryDependencies += "org.apache.pekko" %% "pekko-persistence-testkit" % PekkoVersion
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.apache.pekko</groupId>
      <artifactId>pekko-bom_${scala.binary.version}</artifactId>
      <version>1.1.2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>
<dependencies>
  <dependency>
    <groupId>org.apache.pekko</groupId>
    <artifactId>pekko-persistence-testkit_${scala.binary.version}</artifactId>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("org.apache.pekko:pekko-bom_${versions.ScalaBinary}:1.1.2")

  implementation "org.apache.pekko:pekko-persistence-testkit_${versions.ScalaBinary}"
}
Scala
sourceimport org.apache.pekko.persistence.testkit.scaladsl.PersistenceInit

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._

val timeout = 5.seconds
val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout)
Await.result(done, timeout)
Java
sourceimport org.apache.pekko.persistence.testkit.javadsl.PersistenceInit;
import org.apache.pekko.Done;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

Duration timeout = Duration.ofSeconds(5);
CompletionStage<Done> done =
    PersistenceInit.initializeDefaultPlugins(testKit.system(), timeout);
done.toCompletableFuture().get(timeout.getSeconds(), TimeUnit.SECONDS);