Pregunta Creando Observable sin usar Observable.create


Estoy usando RxJava en mi aplicación de Android y quiero cargar datos de la base de datos.

De esta manera, estoy creando un nuevo Observable usando Observable.create() que devuelve una lista de EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

Aunque funciona correctamente, leo que usando Observable.create() no es realmente una mejor práctica para Rx Java (ver aquí)

Así que cambié este método de esta manera.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

¿Es este un mejor enfoque usando Rx Java? ¿Por qué? ¿Cuál es realmente la diferencia entre los dos métodos?

Además, dado que la base de datos carga una lista de elementos, ¿tiene sentido emitir toda la lista a la vez? ¿O debería emitir un artículo a la vez?


32
2017-12-10 14:45


origen


Respuestas:


Los dos métodos pueden parecer similares y comportarse de manera similar, pero fromCallable se ocupa de las dificultades de la contrapresión para usted, mientras que el create la versión no. Tratar con la contrapresión dentro de un OnSubscribe la implementación abarca desde la fusión mental simple hasta la completa; sin embargo, si se omite, puede obtener MissingBackpressureExceptions a lo largo de los límites asincrónicos (como observeOn) o incluso en límites de continuación (como concat)

RxJava intenta ofrecer soporte de contrapresión adecuado para la mayor cantidad posible de fábricas y operadores, sin embargo, hay bastantes fábricas y operadores que no pueden soportarlo.

El segundo problema con el manual OnSubscribe implementación es la falta de soporte de cancelación, especialmente si genera una gran cantidad de onNext llamadas. Muchos de estos pueden ser reemplazados por métodos estándar de fábrica (como from) o clases de ayuda (como SyncOnSubscribe) que tratan con toda la complejidad para ti.

Puede encontrar muchas presentaciones y ejemplos que (aún) usan create por dos razones.

  1. Es mucho más fácil introducir flujos de datos basados ​​en push mostrando cómo el impulso de los eventos funciona de manera imperativa. En mi opinión, esas fuentes pasan demasiado tiempo con create proporcionalmente, en lugar de hablar sobre los métodos de fábrica estándar y cómo ciertas tareas comunes (como la suya) se pueden lograr de forma segura.
  2. Muchos de estos ejemplos se crearon en el momento en que RxJava no requería soporte de contrapresión o incluso compatibilidad de cancelación síncrona adecuada o simplemente se transfirieron de los ejemplos de Rx.NET (que hasta la fecha no admite contrapresión y la cancelación sincrónica funciona de alguna manera, cortesía de C # I adivinar). La generación de valores al invocar a Next no tenía preocupaciones en ese momento. Sin embargo, tal uso conduce a la saturación del buffer y al uso excesivo de la memoria, por lo tanto, el equipo de Netflix ideó una forma de limitar el uso de la memoria al requerir que los observadores establezcan cuántos elementos están dispuestos a realizar. Esto se conoce como contrapresión.

Para la segunda pregunta, es decir, si uno debe crear una Lista o una secuencia de valores, depende de su fuente. Si su fuente admite algún tipo de iteración o transmisión de elementos de datos individuales (como JDBC), puede conectarlo y emitir uno por uno (ver SyncOnSubscribe) Si no lo admite o si lo necesita en forma de lista de todos modos, guárdelo como está. Siempre puede convertir entre las dos formas a través de toListy flatMapIterable si necesario.


34
2017-12-10 16:03



Como se explica en la respuesta enlazaste, con Observable.create es posible que deba violar los requisitos avanzados de RxJava.

Por ejemplo, necesitarás implementar contrapresión, o cómo darse de baja

En su caso, desea emitir un artículo, sin tener que lidiar con contrapresión o suscripción. Asi que Observable.fromCallable es una buena llamada RxJava se ocupará del resto.


2
2017-12-10 15:34