介绍

操作符map,字面理解一下,就是映射,那么这个操作符如何使用呢?

举例说明

代码示例

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Logger("Emit 1");
        e.onNext(1);
        Logger("Emit 2");
        e.onNext(2);
        e.onComplete();
    }
});

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Logger("onSubscribe");
    }

    @Override
    public void onNext(String object) {
           Logger(" onNext " +  object);
    }

    @Override
    public void onError(Throwable e) {
        Logger("onError e = " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Logger("onComplete");
    }
};
observable.subscribeOn(Schedulers.io())
        .map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "i am " + integer;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);

运行结果

这里写图片描述

可以看到首先执行的是onSubscribe方法,然后发射两个onNext事件,两个Integer参数经过转换之后成为String类型,然后传递给Observer作出动作,这就是Map的作用。

源码浅析

下面是map操作符的源码。

    //观察map中Function的两个参数,一个是T也就是变化前的类型,R为变化后的类型,最后返回的是R类型的Observerable(姑且这么说)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //常规非空检查
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //又是onAssembly,不重要,关注ObservableMap
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

看下ObservableMap的构造方法,传入的参数,source为转换前的Observable< T >, 第二个参数就是我们得function。 ObservableMap继承AbstractObservableWithUpstream,而后者继承Observable,可以看出来,经过map转换后的Observable对象就是ObservableMap类型。

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

既然转换后是ObservableMap类型,那么必然存在subscribeActual方法,果然

    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

看看MapObserver是个什么东西?

   //MapObserver继承BasicFuseableObserver,而后者实现了Observer接口
   static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        //map操作符中的function
        final Function<? super T, ? extends U> mapper;
        //actual是栗子中我们后面subscribe的Observer
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            //done在执行了onError或者onComplete之后为true,具体查看父类BasicFuseableObserver。done为true时,不做处理。
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            //U类型,转换后的类型,栗子中为String
            U v;

            try {
                //常规非空检查,但是里面有个apply的操作,这个apply的操作就是我们上面栗子中把Integer转成String的操作。v就是转换后的String类型
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //Observer执行onNext操作,v为转换后的类型,如栗子中的String
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }

为什么发射之后Observer的onNext没有立刻执行,因为上文例子中subscribeOn和observeOn所在的线程不一样。

results matching ""

    No results matching ""