EventBus是一个基于订阅者-发布者模式框架,该模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听一个对象,通过这种方式对订阅者和主题发布者进行充分解耦,主要用于Android组件间相互通信、线程间互相通信及其他线程与UI线程之间互相通信等。代替了传统的HandlerBroadCastReceiverInterface回调等通信方式,相比之下EventBus的优点是代码简洁,使用简单,并将事件发布和订阅充分解耦。

一 基本使用

1自定义一个数据类



public class MessageEvent {
    public String getEventMessage() {
        return eventMessage;
    }

    private final String eventMessage;

    public MessageEvent(String eventMessage) {
        this.eventMessage = eventMessage;
    }
}

2注册、反注册

 		@Override
    public void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        EventBus.getDefault().register(this);
    }

    @Override
    public void onStop() {
        super.onStop();
        EventBus.getDefault().unregister(this);
    }

3添加处理事件的方法

 @Subscribe
    public void receiveMessage(MessageEvent messageEvent) {
        receivedMessage.setText(messageEvent.getEventMessage());
    }


4发送事件

 private void sendMessage() {
        EventBus.getDefault().post(new MessageEvent("这是来自FragmentA的消息!"));
    }

整个流程如下图所示:

 

那么接下来我们根据上图展示的事件总线来深入源码理解一下其实现方法。

二 Subscribe注解

@Documented
@Retention(RetentionPolicy.RUNTIME) 
@Target({ElementType.METHOD}) 
public @interface Subscribe {

    // 指定事件订阅方法所在的线程模式,也就是决定订阅方法是在哪个线程,默认POSTING模式
    ThreadMode threadMode() default ThreadMode.POSTING;

    // 是否支持粘性事件
    boolean sticky() default false;

    // 优先级,如果指定了优先级,则若干方法接收同一事件时,优先级高的方法会先接收到。
    int priority() default 0;
}

ThreadMode可以指定的模式有:

  1. ThreadMode.POSTING:默认的线程模式,在哪个线程发送事件就在对应线程处理事件,避免了线程切换,效率高。
  2. ThreadMode.MAIN:如在主线程(UI线程)发送事件,则直接在主线程处理事件;如果在子线程发送事件,则先将事件入队列,然后通过 Handler 切换到主线程,依次处理事件。
  3. ThreadMode.MAIN_ORDERED:无论在哪个线程发送事件,都将事件加入到队列中,然后通过Handler切换到主线程,依次处理事件。
  4. ThreadMode.BACKGROUND:与ThreadMode.MAIN相反,如果在子线程发送事件,则直接在子线程处理事件;如果在主线程上发送事件,则先将事件入队列,然后通过线程池处理事件。
  5. ThreadMode.ASYNC:与ThreadMode.MAIN_ORDERED相反,无论在哪个线程发送事件,都将事件加入到队列中,然后通过线程池执行事件

三 register注册

 先看 EventBus 注册

EventBus.getDefault().register(this);

 

public static EventBus getDefault() {
        
    if (defaultInstance == null) {
         synchronized (EventBus.class) {
             if (defaultInstance == null) {
                 defaultInstance = new EventBus();
             }
         }
    }
    return defaultInstance;
}

getDefault()其实就是一个单例模式,创建EventBus实例对象,并返回 ,我们看看其构造方法的源码:

public EventBus() {
        this(DEFAULT_BUILDER);
    }

    EventBus(EventBusBuilder builder) {
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }

可以看到,EventBusBuilder中主要是对EventBus的一些基础的配置信息.

再看register

public void register(Object subscriber) {
     Class<?> subscriberClass = subscriber.getClass(); // 获取传入的要注册类的字节码文件
     List<SubscriberMethod> subscriberMethods = 
     subscriberMethodFinder.findSubscriberMethods(subscriberClass); //1
     
     synchronized (this) {

         // 遍历订阅方法封装类的集合
         for (SubscriberMethod subscriberMethod : subscriberMethods) {
              subscribe(subscriber, subscriberMethod); //4
         }
     }
}
public class SubscriberMethod {

   //订阅方法的信息类,包含Method对象
    final Method method;
   //线程模式
    final ThreadMode threadMode;
   //事件类型 
    final Class<?> eventType;
   //优先级
    final int priority;
   // 是否是粘性
    final boolean sticky;
   
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }

   
}

从上面的图可以看出,SubscriberMethod类主要描述的是订阅方法,即订阅者中用来订阅Event的方法

findSubscriberMethods2件事

1 调用 findSubscirberMethods,获取注册类上所有订阅方法的信息集合

2 遍历这个信息集合,给2个map填充数据:

明确了SubscriberMethod类的内容之后我们回过头来来看看findSubscriberMethods(Class<?> subscriberClass)方法的逻辑:

/**
 *
 * 获取当前要进行注册类中的所有订阅方法
 */
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {

    // METHOD_CACHE: 是一个ConcurrentHashMap,key是要注册类的字节码文件,value是这个字节码文件里的所有订阅方法信息的集合
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); // 这步实际上就是看看这个注册类的方法是否已经缓存了,缓存过就直接根据类返回
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    // EventBus是支持EventBusBuilder的,如果我们自定义了EventBusBuilder,则ignoreGeneratedIndex为true,否则为false,我们没自定义,所有看false
    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {

        //2
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    
    // 如果该类没有找到订阅方法,抛出异常
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
 
        // 将该注册类的类型为key, 将这个类所有注册方法的封装类集合为value存入map集合
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;

        // ->> 返回register()方法中
    }
}

可以看到,首先有一个类型为Map<Class<?>, List<SubscriberMethod>> 的对象,其key类对象(Class)value是是SubscriberMethod对应的List集合。在findSubscriberMethods(Class<?> subscriberClass)方法中会首先从METHOD_CACHE 这个Map中根据订阅类类对象查询看是否之前已经存在该订阅者订阅方法集合了,如果已经存在(应该是之前缓存的),直接从METHOD_CACHE 这个Map中拿到对应的List并返回,如果之前没有缓存过即METHOD_CACHE 这个Map中没有以当前subscriberClasskey的键值对,则需要从subscriberClass类中去找订阅方法

 

我们再看findUsingInfo(subscriberClass)方法:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
  
    // FindState辅助我们查找订阅方法的类,后面会讲述
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);

    // findState.clazz就是我们的注册类subscriberClass
    while (findState.clazz != null) {

        findState.subscriberInfo = getSubscriberInfo(findState);

        // 该类第一次注册时,findState.subscriberInfo为null, 我们走false
        if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
        } else {

            //3
            findUsingReflectionInSingleClass(findState);
        }

        // 修改findState.clazz为subscriberClass的父类Class,即需要遍历父类
        findState.moveToSuperclass();
    }

     // 将查找到的方法保存在了FindState实例的subscriberMethods集合中。然后使用subscriberMethods构建一个新的List<SubscriberMethod>并返回,最后释放掉findState
    return getMethodsAndRelease(findState);

    // ->> 返回到findSubscriberMethods() 方法中
}

 

可以看到,首先调用prepareFindState()获取了一个FindState对象,看一下prepareFindState()

 class SubscriberMethodFinder{
   ...
     	private static final int POOL_SIZE = 4;
      private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

      private FindState prepareFindState() {
              synchronized (FIND_STATE_POOL) {
                  for (int i = 0; i < POOL_SIZE; i  ) {
                      FindState state = FIND_STATE_POOL[i];
                      if (state != null) {
                          FIND_STATE_POOL[i] = null;
                          return state;
                      }
                  }
              }
              return new FindState();
          }
   
 }

prepareFindState()方法中首先从一个已有的FindState[] FIND_STATE_POOL对象池中找,看对象池中有没有哪一个位置是**非空(null)**的,如果有,就将该位置的值置为空(null),然后将该位置原来的对象返回,如果最后发现对象池中没有一个位置的对象值为null即对象池中不存在可用的对象,再new一个新的FindState对象并返回,这是一种经典的使用对象池达到对象复用以减少内存开销的方法

.我们看一下这个FindState类到底是个什么东西:

上图分析首先调用methods = findState.clazz.getDeclaredMethods()findState.clazz是什么?我们来看看源码:

FindState{
      Class<?> subscriberClass;
      //订阅者对象
      Class<?> clazz;
      boolean skipSuperClasses;
      SubscriberInfo subscriberInfo;

      void initForSubscriber(Class<?> subscriberClass) {
          this.subscriberClass = clazz = subscriberClass;
          skipSuperClasses = false;
          subscriberInfo = null;
      }
}

findState.clazz就是刚才我们的订阅者对象findUsingReflectionInSingleClass()调用了它的getDeclaredMethods()方法,

FindState.moveToSuperclass()

们看这个方法的源码:

void moveToSuperclass() {
            if (skipSuperClasses) {
                clazz = null;
            } else {
                clazz = clazz.getSuperclass();
                String clazzName = clazz.getName();
                /** Skip system classes, this just degrades performance. */
                if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
                    clazz = null;
                }
            }
        }

首先判断了skipSuperClasses的值,从FindState的构造方法中:

void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

我们可以看到该值默认为false,那么就会执行:

 

首先明确clazz原本的值是什么:

 void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

可以看到clazz的值为initForSubscriber()中传入的参数,回想一下我们是在哪里调用initForSubscriber()的:

Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriberClass);
subscriberMethods = findUsingInfo(subscriberClass);
findState.initForSubscriber(subscriberClass);

所以,这个clazz就是subscriberClass,也就是我们的订阅类

继续看这段代码:

    clazz = clazz.getSuperclass();
    String clazzName = clazz.getName();
    /** Skip system classes, this just degrades performance. */
    if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
      clazz = null;
    }

首先使用clazz = clazz.getSuperclass()获取到当前订阅者类的父类,然后使用String clazzName = clazz.getName()获取当前订阅者类的父类的类名,然后判断:

 if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android."))

 

就是判断了一下当前订阅者类的父类是否是以javajavaxandroid开头的,也就是判断当前类的父类是否是android本身的类,如果是,则将clazz置为null

所以这个void moveToSuperclass()方法的作用就是将findState中的clazz置为当前订阅者类的父类,当然,如果当前订阅者类没有父类或者当前订阅者类的父类是系统的类,则将clazz置为空(null)。

我们分析完了 findState.moveToSuperclass(),注意,执行完了 findState.moveToSuperclass()之后相当于第一次while循环执行完了,现在会去进行下一个while循环的判断,那么while进入循环的条件是什么?是findState.clazz != null,所以,如果之前订阅者类没有父类(或者有父类但是父类是android自身的类),则这个循环就会跳出,否则如果之前订阅者类还有父类,就会进入下一个循环,其实是一个递归的过程,从初始订阅者类开始,一级一级向上遍历父类,直到最顶级。

然后,while循环完了之后,会返回getMethodsAndRelease(findState)

SubscriberMethodFinder.getMethodsAndRelease(FindState findState);

我们看看这个方法的源码:

 private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i  ) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

这个方法首先拿到了findState.subscriberMethods,也就是订阅者中的所有订阅方法的List,注意这里是new了一个List并将findState.subscriberMethods的值拷贝了过去,而不是简单地使用=,这样做的目的是为了下一步对findState.subscriberMethods做修改时不会改变这里的subscriberMethods,也就是findState.recycle()

void recycle() {
  subscriberMethods.clear();
  anyMethodByEventType.clear();
  subscriberClassByMethodKey.clear();
  methodKeyBuilder.setLength(0);
  subscriberClass = null;
  clazz = null;
  skipSuperClasses = false;
  subscriberInfo = null;
}

可以看到,这个recycle()方法其实是将当前findState恢复成初始状态,这里使用了 subscriberMethods.clear()清空了subscriberMethods,如果之前简单使用=,则这里执行clear之后subscriberMethods也会被clear掉,这涉及到java的引用机制,这里不展开赘述。

执行findState.recycle()findState恢复初态之后,执行了:

synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i  ) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }

这个逻辑是否有点眼熟?对的,它和前面介绍的:

class SubscriberMethodFinder{
   ...
     	private static final int POOL_SIZE = 4;
      private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

      private FindState prepareFindState() {
              synchronized (FIND_STATE_POOL) {
                  for (int i = 0; i < POOL_SIZE; i  ) {
                      FindState state = FIND_STATE_POOL[i];
                      if (state != null) {
                          FIND_STATE_POOL[i] = null;
                          return state;
                      }
                  }
              }
              return new FindState();
          }
   
 }

中的:

 synchronized (FIND_STATE_POOL) {
                  for (int i = 0; i < POOL_SIZE; i  ) {
                      FindState state = FIND_STATE_POOL[i];
                      if (state != null) {
                          FIND_STATE_POOL[i] = null;
                          return state;
                      }
                  }
              }

逻辑是类似的,只不过prepareFindState()中是从对象池中拿出了一个findState并将其对应的位置置为null,相当于借出getMethodsAndRelease()是查看对象池中的空缺位置然后将现在不用的findState放进去,相当于归还。借出时将其坑位置为null,归还时再将坑位补上,完美地利用了内存,一点也没浪费,不得不佩服作者的神勇。

归还完findState之后 private List<SubscriberMethod> getMethodsAndRelease(FindState findState)会执行最后一行:

 return subscriberMethods;

分析3 处

/**
 * 分析3:findUsingReflectionInSingleClass()
 * 作用:通过反射获取订阅方法的信息
 */
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
         // 通过反射获取订阅类中的所有方法
         methods = findState.clazz.getDeclaredMethods();
     } catch (Throwable th) {
         ...
     }

     // 遍历方法
     for (Method method : methods) {

        // 获取方法修饰符
        int modifiers = method.getModifiers();

        // 方法是public类型,但非abstract、static等
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            
            // 获取方法的修饰类型
            Class<?>[] parameterTypes = method.getParameterTypes();
            
            // 只能是1个参数
            if (parameterTypes.length == 1) {

                // 获取方法上的名为Subscribe的注解
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                
                // 如果该方法带Subscribe注解
                if (subscribeAnnotation != null) {
                       
                    // 获取该订阅方法上的第一个参数类型,也就是订阅的事件类型
                    Class<?> eventType = parameterTypes[0];
                     
                    // checkAdd()方法用来判断FindState中是否已经添加过将该事件类型为key的键值对,没添加过则返回true
                    if (findState.checkAdd(method, eventType)) {
                            
                        // 获取线程模式
                        ThreadMode threadMode = subscribeAnnotation.threadMode();

                        // 将该订阅方法,事件类型,线程模式,优先级,是否支持粘性事件等信息,封装成SubscriberMethod对象,并添加到findState中的subscriberMethods集合里
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky()));

                         // ->> 返回到findUsingInfo() 方法中
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                ...
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            ...
        }
    }
}

根据反射,获取订阅方法的信息数据,然后将它分封装成SubscriberMethod对象,并添加到findState的集合中。

/**
 * 分析4:subscribe()
 * 作用:主要就是构建2个map对象
 */
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    
    // 获取该订阅方法的事件类型
    Class<?> eventType = subscriberMethod.eventType;
        
    // 将订阅方法的封装类,再进行封装,也就是注册类的信息也存入了    
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    
    // subscriptionsByEventType是hashmap, 以事件类型为key, Subscription集合为value
    // 先查找subscriptionsByEventType是否存在以当前事件类型为key的值
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

    // 如果没有的话   
    if (subscriptions == null) {

        // 创建集合,根据事件类型,合并数据
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
        }
    }

    // 添加上边创建的newSubscription对象到subscriptions中
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
  
        // 根据优先级进行排序
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
 
    // typesBySubscriber也是一个HashMap,保存了以当前要注册类的对象为key,注册类中订阅事件的方法的参数类型的集合为value的键值对
    // 和上面一样,根据key先判断,是否已经存储过了,如果已经存储过了,直接取出订注册类中订阅事件的方法的参数类型的集合
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    // 是否支持粘性事件
    if (subscriberMethod.sticky) {
        
      
        ...
    }

总结一下

传入注册类信息,根据反射获取注册类上的所有方法,遍历这些方法,取出其中的订阅方法(条件是,一个参数,权限为public,使用了Subscribe标签)将方法的信息封装成SubscriberMethod对象,并存入集合,然后再遍历这个集合,取出其中的SubscriberMethod对象,再根据注册类的字节码文件,合并成Subscription对象,再根据event类型,进行重新分类,存入map subscriptionsByEventType中(key 为event, value 为List),再创建map typesBySubscriber, 注册类为key , list为value。 完事了。

四.unregister取消注册

EventBus.getDefault().unregister(this);
public synchronized void unregister(Object subscriber) {

    
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    
    // 如果集合不为null    
    if (subscribedTypes != null) {
            
         // 遍历集合,获取订阅事件的类型
         for (Class<?> eventType : subscribedTypes) {
              
              
              unsubscribeByEventType(subscriber, eventType);
         }
         typesBySubscriber.remove(subscriber);
     } else {
          logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
     }
}

还记得我们分析注册时,创建的那2个map吗? 其中一个是typesBySubscriber,key是注册类,value是事件类型的集合(List), 这一步就是根据注册类获取该类所有订阅方法的事件类型。


private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        
    // 根据事件类型,获取该事件类型所对应的订阅方法信息的集合
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    
    // 如果集合不为null
    if (subscriptions != null) {
            
        // 遍历,该事件类型所对应的订阅方法
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
        
             // 获取Subscription对象,该对象包含了订阅方法的所有信息和注册类信息
             Subscription subscription = subscriptions.get(i);
             
             // 因为subscriptionsByEventType可不光包含了1个注册类的信息,所以要加下面的判读,如果该订阅方法所在的注册类是我们要解除的注册类的话
             if (subscription.subscriber == subscriber) {
                 subscription.active = false;

                 // 从集合中,将该订阅方法的信息删除掉
                 subscriptions.remove(i);
                 i--;
                 size--;
             }
        }
    }
}

五 post发布事件

然后我们分析一下post()方法的执行逻辑:

 EventBus.getDefault().post(new MessageEvent("这是来自FragmentA的消息!"));

public void post(Object event) {
       
    // 
    PostingThreadState postingState = currentPostingThreadState.get();
    
    // 获取postingState里面存的一个队列
    List<Object> eventQueue = postingState.eventQueue;

    // 将要发送的事件,存入队列中
    eventQueue.add(event);

    // 判断该事件是否正在发送,如果在发送,则跳过下面的逻辑
    if (!postingState.isPosting) {

        // 判断是否在主线程
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {

             // 遍历队列
             while (!eventQueue.isEmpty()) {
             
                 // ->>5
                 postSingleEvent(eventQueue.remove(0), postingState);
             }
        } finally {
            
            // 重置状态
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

首先执行了PostingThreadState postingState = currentPostingThreadState.get(),看一下这个currentPostingThreadState是什么:

 private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

这是一个ThreadLocal<PostingThreadState>类型的对象,主要是为每个线程存储一个唯一的PostingThreadState对象,看看PostingThreadState是什么:

    /** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

里面有一个List以及一些其他的成员变量。

那么这个PostingThreadState postingState = currentPostingThreadState.get()语句做的就是获取到当前线程对应的PostingThreadState对象。

然后执行 List<Object> eventQueue = postingState.eventQueue拿到了postingState对象中的那个List,然后执行eventQueue.add(event)将当前事件添加进了eventQueue这个List中。

然后执行:

if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }

首先判断postingState.isPosting的真值,从PostingThreadState的源码我们可以看到这个isPosting默认是false的,所以会进入if语句块内执行相应逻辑。

首先:

postingState.isMainThread = isMainThread();
postingState.isPosting = true;

postingState.isMainThreadpostingState.isPosting赋值。

然后判断postingState.canceled,从PostingThreadState源码知其默认为false,不会触发异常。然后遍历eventQueue这个List,执行了:

 

EventBus.postSingleEvent(Object event, PostingThreadState postingState)


private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;

    // 是否要查看所有的继承关系
    if (eventInheritance) {
           
         // 通过lookupAllEventTypes()拿到该事件所有的父类事件类型
         List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
         int countTypes = eventTypes.size();

         // 遍历事件类型
         for (int h = 0; h < countTypes; h++) {
             Class<?> clazz = eventTypes.get(h);

             // ->> 6
             subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
     
    if (!subscriptionFound) {
           
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
            
        // 如果我们没有订阅事件,则发送NoSubscriberEvent
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

postSingleEvent()方法中,根据eventInheritance属性,决定是否向上遍历事件的父类型,然后用postSingleEventForEventType()方法进一步处理事件。

/**
 * 分析6
 */
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {

    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {

        // 还记得注册时构建的map subscriptionsByEventType吗?对,这步就是根据事件类型,获取它所对应的List<subscription>也就是订阅方法集合
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    
    // 如果集合不为空
    if (subscriptions != null && !subscriptions.isEmpty()) {
        
        // 遍历集合,取出Subscription(订阅方法信息包装类)    
        for (Subscription subscription : subscriptions) {
                
            // 记录事件
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
 
                // 处理事件 ->> 分析7
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

postSingleEventForEventType 就是根据事件类型,在subscriptionsByEventType中找到对应的订阅方法信息的集合,然后遍历集合,拿到订阅方法信息的封装类,调用postToSubscription去执行。

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {

    // 根据订阅方法设置的线程模式去执行
    switch (subscription.subscriberMethod.threadMode) {
    
        // 默认线程模式,在哪个线程发送事件,就在哪个线程接收事件
        case POSTING:

            // ->>8 
            invokeSubscriber(subscription, event);
            break;

        // 如果是主线程,则直接执行,子线程加入队列,然后通过 Handler 切换到主线程执行
        case MAIN:
            if (isMainThread) {

                // 主线程,直接反射执行
                invokeSubscriber(subscription, event);
            } else {

                // ->> 9
                mainThreadPoster.enqueue(subscription, event);
            }
            break;

        // 无论在哪个线程,都加队列,通过handler 在主线程执行
        case MAIN_ORDERED:
            
         
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;

        // 如果在子线程中,直接执行,如果在主线程中,加入队列,通过线程池执行
        case BACKGROUND:
                
            if (isMainThread) {

                // ->> 13
                backgroundPoster.enqueue(subscription, event);
            } else {
 
                // 在子线程,直接反射执行
                invokeSubscriber(subscription, event);
            }
            break;

        // 无论在哪个线程执行,都加入队列,用线程池执行
        case ASYNC:
             
             // AsyncPoster和backgroundPoster类型,但是AsyncPoster没有加同步锁,这也就造成了,它每次执行一个任务,都会开一个子线程,而backgroundPoster不会
             asyncPoster.enqueue(subscription, event);
             break;
        default:
             throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

判断了当前订阅方法的threadMode的值,主要是按照订阅方法的线程值分为了四个主要部分处理,POSTING代表着当前线程,在哪个线程发送事件,就在哪个线程处理事件;MAIN代表只在主线程处理事件;BACKGROUND代表只在非主线程处理事件;ASYNC也是代表在非主线程处理事件。

  

/**
 * 分析8
 */
void invokeSubscriber(Subscription subscription, Object event) {

    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

调用subscription中订阅方法的invoke()方法,传入了订阅者类和事件对象两个参数,实际上就是调用了订阅方法,并将当前事件作为参数传递进去,这样订阅方法就实现了事件的接收。

/**
 * 分析9:mainThreadPoster为HandlerPoster, 具体分析下HandlerPoster
 */
 public class HandlerPoster extends Handler implements Poster {
 
    private final PendingPostQueue queue;
    private boolean handlerActive;
    ......
    public void enqueue(Subscription subscription, Object event) {
        // 用subscription和event封装一个PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 加入到队列中
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                
                // sendMessage()发送处理事件的消息,handleMessage()方法将被执行,将子线程切换到主线程
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            // 遍历队列
            while (true) {
                // 出队列,取出PendingPost对象
                PendingPost pendingPost = queue.poll();
                ...
                // ->>12
                eventBus.invokeSubscriber(pendingPost);
                ...
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

现在来看下 PendingPost

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
      return new PendingPost(event, subscription);
    }

首先使用synchronizedpendingPostPool对象进行了加锁,看一下这个pendingPostPool对象的声明:

 private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

它是一个List对象,然后如果该Listsize不为0,则返回该List的最后一个元素,否则就new一个PendingPost对象并返回。

/**
 * 分析12
 */
void invokeSubscriber(PendingPost pendingPost) {
     
    // 取出事件类型
    Object event = pendingPost.event;

    // 取出订阅方法的信息封装类
    Subscription subscription = pendingPost.subscription;

    // 释放pendingPost引用的资源
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {

        // 通过反射调用执行该订阅方法
        invokeSubscriber(subscription, event);
    }
}

/**
 * 分析13
 */
final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {

        // 用subscription和event封装一个PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {

            // 加入队列
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;

                // 调用newCachedThreadPool线程池,执行任务
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {

                // 循环队列
                while (true) {
                    
                    // 等待1秒,取出PendingPost对象
                    PendingPost pendingPost = queue.poll(1000);
                    ...
                    // ->> 12(在上面)
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
}

总结一下

post也不难,首先是将发送的事件保存在postingState中的队列里面,它是线程独有的,然后遍历postingState中的事件队列,拿出该线程下,所有的事件的集合,然后遍历它,再根据subscriptionsByEventType,取出该事件所对应的所有订阅方法,然后看是否能够直接处理,如果能,直接反射调用订阅方法,如果不能,直接通过HandlerPower、BackgroundPower、AsyncPower切换线程后,再进行反射调用处理。

其中HandlerPower内部就直是封装了个Handler,每次调用的时候,先将事件加入到队列中,然后根据Handler切换到主线程,按顺序取出队列中的事件,反射执行

BackgroundPower是封装了catchThreadPool用于执行任务, AsyncPower与它类似,但是里面没有同步锁,每次执行都会新开辟一个子线程去执行任务。而BackgroundPower之会开一个线程

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐