Pregunta Secuencia de eventos Akka Persistence Query y CQRS


Estoy tratando de implementar el lado de lectura en mi arquitectura ES-CQRS. Digamos que tengo un actor persistente como este:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

Por lo que yo entiendo, cada vez que el evento persiste, podemos publicarlo a través de Akka Persistence Query. Ahora, no estoy seguro de cuál sería la forma adecuada de suscribirme a estos eventos, así que puedo persistir en mi base de datos de lectura. Una de las ideas es enviar inicialmente un UsersStream mensaje de mi lado leído actor para UserWrite actor y eventos de "hundimiento" en ese actor de lectura.

EDITAR

Siguiendo la sugerencia de @cmbaxter, implementé el lado de lectura de esta manera:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

Hay algunos problemas como: la secuencia de eventos parece ser lenta. Es decir. UserRead El actor puede responder con un conjunto de usuarios antes de que se guarde el usuario recién agregado.

EDIT 2

Aumenté el intervalo de actualización del diario de consulta de Cassandra, que resuelve menos problemas con el flujo de eventos lentos. Parece que el diario de eventos Cassandra está por defecto, sondeándose cada 3 segundos. En mi application.conf Yo añadí:

cassandra-query-journal {
  refresh-interval = 20ms
}

EDIT 3

En realidad, no disminuya el intervalo de actualización. Eso aumentará el uso de la memoria, pero eso no es peligroso, ni un punto. En general, el concepto de CQRS es que el lado de escritura y el de lectura son asincrónicos. Por lo tanto, después de escribir datos, nunca estarán disponibles de inmediato para su lectura. ¿Cómo lidiar con UI? Simplemente abro la transmisión y envío de datos a través de eventos enviados por el servidor después de que el lado de lectura los acuse.


7
2017-07-07 13:28


origen


Respuestas:


Hay algunas maneras de hacer esto. Por ejemplo, en mi aplicación tengo un actor en mi lado de la consulta que tiene un PersistenceQuery que constantemente busca cambios, pero también puede tener un hilo con la misma consulta. Lo importante es mantener la secuencia abierta para poder leer el evento persistente tan pronto como suceda

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

En lugar de esto, puede tener un temporizador que genere un PersistenceQuery y almacene nuevos eventos, pero creo que tener una transmisión abierta es la mejor manera.


4
2017-07-11 10:58



Aunque la solución con PersistenceQuery solo se aprobó, contiene los siguientes problemas:

  1. Es parcial, solo hay un método para leer EventEnvelopes presentado.
  2. No puede funcionar con instantáneas de estado y, como resultado, CQRS Reader Part debería terminar todos los eventos persistentes persistieron alguna vez.

La primera solución es mejor, pero tiene los siguientes problemas:

  1. Es muy complicado. Causa al usuario el manejo innecesario de los números de secuencia.
  2. El código trata con el estado (consulta / actualización) también junto con la implementación de Actores.

Existe existe uno más simple:

import akka.NotUsed
import akka.actor.{Actor, ActorLogging}
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal}
import akka.persistence._
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source

/**
  * Created by alexv on 4/26/2017.
  */
class CQRSTest {

  // User Command, will be transformed to User Event
  sealed trait UserCommand
  // User Event
  // let's assume some conversion from Command to event here
  case class PersistedEvent(command: UserCommand) extends Serializable
  // User State, for simplicity assumed that all State will be snapshotted
  sealed trait State extends Serializable{
    def clear(): Unit
    def updateState(event: PersistedEvent): Unit
    def validateCommand(command:UserCommand): Boolean
    def applyShapshot(newState: State): Unit
    def getShapshot() : State
  }
  case class SaveSnapshot()

  /**
    * Common code for Both reader and writer
    * @param state - State
    */
  abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging {
    override def persistenceId: String = "CQRSPersistenceId"

    override def preStart(): Unit = {
      // Since the state is external and not depends to Actor's failure or restarts it should be cleared.
      state.clear()
    }

    override def receiveRecover: Receive = {
      case event : PersistedEvent => state.updateState(event)
      case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot)
      case RecoveryCompleted  => onRecoveryCompleted(super.lastSequenceNr)
    }

    abstract def onRecoveryCompleted(lastSequenceNr:Long)
  }

  class CQRSWriter(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSWriter Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed")
    }

    override def receiveCommand: Receive = {
      case command: UserCommand =>
        if(state.validateCommand(command)) {
          // Persist events and call state.updateState with each persisted event
          persistAll(List(PersistedEvent(command)))(state.updateState)
        }
        else {
          log.error("Validation Failed for Command: {}", command)
        }
      case SaveSnapshot => saveSnapshot(state.getShapshot())
      case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata)
      case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason)
    }
  }

  class CQRSReader(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSReader Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed, Starting QueryStream")

      // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests)
      val readJournal = PersistenceQuery(context.system).readJournalFor(
        context.system.settings.config.getString("akka.persistence.query.my-read-journal"))
        .asInstanceOf[ReadJournal
        with EventsByPersistenceIdQuery]
      val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
        OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue)
      source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer())

    }

    // Nothing received since it is Reader only
    override def receiveCommand: Receive = Actor.emptyBehavior
  }
}

3
2018-04-26 13:29