需要掌握的前置知识
1. ThreadLocal
1.1ThreadLocal 是什么?
ThreadLocal翻译过来就是本地线程变量,意思是说,ThreadLocal 中填充的的是当前线程的变量,该变量对其他线程而言是封闭且隔离的,ThreadLocal 为变量在每个线程中创建了一个副本,这样每个线程都可以访问自己内部的副本变量,避免直接访问变量,导致变量被修改,提高效率,常用的场景有:
1、在进行对象跨层传递的时候,使用ThreadLocal可以避免多次传递,打破层次间的约束。
2、线程间数据隔离
3、进行事务操作,用于存储线程事务信息。
4、数据库连接,Session会话管理。
1.2ThreadLocal 内部是如何实现的?
1.set方法,
public void set(T value) {
//首先获取当前线程对象
Thread t = Thread.currentThread();
//获取线程中变量 ThreadLocal.ThreadLocalMap
ThreadLocalMap map = getMap(t);
//如果不为空,
if (map != null)
//每次调用set方法,都会检查是否key == null的值,因为key在ThreadLocal中定义的弱引用,很容易被GC掉,而value是强引用,容易造成内存泄露
map.set(this, value);
else
//如果为空,初始化该线程对象的map变量,其中key 为当前的threadlocal 变量
createMap(t, value);
}
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
//初始化线程内部变量 threadLocals ,key 为当前 threadlocal
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
/**
* Construct a new map initially containing (firstKey, firstValue).
* ThreadLocalMaps are constructed lazily, so we only create
* one when we have at least one entry to put in it.
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
ThreadLocalMap中的set(),在设置value时,会遍历Entry数组,如果
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
//如果k == null,则会回收
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
replaceStaleEntry(key, value, i)的源码
//1.前向搜索,出现key为null的Entry肯定是因为上次GC了,而之所以去前向搜索,是因为很有可能其它Entry在上次GC中也没能存活。另外并不是【相邻位置有很大概率会出现stale entry】,而是因为它只能一个个遍历,所以从【相邻】的位置开始遍历
// 2.向后遍历,是因为ThreadLocal用的是开地址,很可能当前的stale entry对应的并不是hascode为此槽索引的Entry,而是因为哈希冲突后移的Entry,那么很有可能hascode对应该槽的Entry会往后排。基于Thread Local Map中不允许有两个槽指向同一个引用的原则,如果存在那个hascode对应本槽但是在后面排列的Entry,则要【向后遍历】找到它,并且替换至本槽。否则直接设置值就会在Thread Local Map中存在两个指向一个ThreadLocal引用的槽
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
//将value设置为null,方便垃圾回收
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
1.3. 为什么key使用弱引用?
如果使用强引用,当ThreadLocal
对象的引用(强引用)被回收了,ThreadLocalMap
本身依然还持有ThreadLocal
的强引用,如果没有手动删除这个key ,则ThreadLocal
不会被回收,所以只要当前线程不消亡,ThreadLocalMap
引用的那些对象就不会被回收, 可以认为这导致Entry
内存泄漏。
1.4总结
ThreadLocal 它是一个数据结构,有点像HashMap,可以保存"key : value"键值对,但是一个ThreadLocal只能保存一个,并且各个线程的数据互不干扰。在线程1中初始化了一个ThreadLocal对象localName,并通过set方法,保存了一个值value,同时在线程1中通过localName.get()
可以拿到之前设置的值,但是如果在线程2中,拿到的将是一个null。同时ThreadLocal在每次进行get() 和set()时都会进行一次遍历,对于key == null的 调用replaceStaleEntry()方法,将value 设置为null,方便垃圾回收,但是还是会有内存泄露的风险,最保险的方法在使用ThreadLocal之后,调用remove()方法
2.linux的epoll机制
具体详情可以参考这个:深入理解 Linux 的 epoll 机制 - 腾讯云开发者社区-腾讯云 (tencent.com)
2.1 epoll 机制的定义
epoll机制是属于linux中的一种非阻塞式的IO多路复用,对于linux 系统来说,一切皆是文件,我们操作文件首先拿到文件描述符(fd),然后通过建立epoll池管理这些fd,并且将epoll池中循环设置为阻塞式的,epoll池采用红黑树的数据结构,能够在对fd的高强度的增加、删除操作下也能保持良好的查找效率,通过linux内核提供的epoll_ctl 工具,在数据准备好之后,立马调用文件的回调,回调的时候自动把fd的相关结构体放到指定队列中,并唤醒操作系统。
正文
1.1消息的定义
通信的话,就像寄信一样,你首先要有邮件,那么先定义消息是什么样子的,然后再定义快递员,信件的包含的内容一般有邮件编号,目的地,内容,我们不难定义出这样的类
public class Message{
//what 相当邮件编号,当发送给同一个收件人多封信件时,让收件人做出区分
public int what;
//arg1,arg2 相当于信件的内容
public int arg1;
public int arg2;
//目的地,用于消息的分发
Handler target;
//构造方法
public Message() {
}
}
1.2消息创建和回收
消息的创建通常是通过构造函数来创建,但是通常我们要处理的消息,不仅来自于用户,还包含系统自己创建的消息,如果只通过构造函数来创建,因为每个App进程分配的内存是有限的,频繁创建对象,可能导致系统频繁GC,造成系统卡顿。
那么我们要考虑一下使用什么样的数据结构?使用数组的话,消息的插入和删除,需要一大块连续内存,而且消息的数量不确定,同时有可能会导致数组的大规模移动,降低效率,不考虑;使用链表的话,插入和删除,都比较方便,查询的话,每次处理消息都是从链表头部取出消息,不需要进行特殊的查询,所以我们使用链表作为消息的数据结构,那么在Message类中,增加一个next指针,同时更好的复用消息,我们通过池化的方式对消息进行处理
public class Message{
//what 相当邮件编号,当发送给同一个收件人多封信件时,让收件人做出区分
public int what;
//arg1,arg2 相当于信件的内容
public int arg1;
public int arg2;
//目的地,用于消息的分发
Handler target;
//下一个消息的指针
Message next;
//构造方法
public Message() {
}
}
下面是消息的创建,常见的方式有:
//方式1
val message = Message()
//方式2
val message = Handler().obtainMessage()
//方式3
val message = Message.obtain()
3种方式都是在内存中创建message对象,区别在于方式2和方式3,通过next指针,构建了一个消息池,上限是50个,方式2最终还是调用方式3,系统会自动回收message对象,但这里的回收不是直接将message对象置为null,而是通过将message对象中所有信息全部清空,然后将它添加到系统的消息池中,实际使用尽量采用方式2和方式3,避免频繁创建对象
/**
* Return a new Message instance from the global pool. Allows us to
* avoid allocating new objects in many cases.
*/
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}
下面是消息的回收,通过将消息中的what,arg1,arg2等数据设置为默认初始值,实现回收,
public void recycle() {
if (isInUse()) {
if (gCheckRecycle) {
throw new IllegalStateException("This message cannot be recycled because it "
+ "is still in use.");
}
return;
}
recycleUnchecked();
}
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
//标记为未被是使用
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = UID_NONE;
workSourceUid = UID_NONE;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
//sPool相当于链表的尾指针
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
1.3 消息的入队
先重写一个Handler
private class MyHandler(val wrActivity:WeakReference<SecondActivity>):Handler(Looper.getMainLooper()){
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
wrActivity.get()?.run{
when(msg.what){
WHAT -> {
if(msg.arg1 == 1){
Log.e("handleMessage","arg1")
}else{
Log.e("handleMessage","arg2")
}
}
else ->{
Log.d("handleMessage","else")
}
}
}
}
}
//调用
val secondHandler = MyHandler(WeakReference(this))
val message = Message.obtain()
message.what = WHAT
message.arg1 = 1000
// secondHandler.post {
// Log.e("post", Thread.currentThread().name)
// binding.tvSecond.text = "post1---1"
// }
secondHandler.sendMessage(message)
进入Handler的sendMessage()方法
public final boolean sendMessage(@NonNull Message msg) {
return sendMessageDelayed(msg, 0);
}
进入sendMessageDelayed()
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
进入sendMessageAtTime()
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
进入enqueueMessage(),通过msg.target 设置目标的handler,方便在消息取出之后,调用重写的回调方法
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
//
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
最终调用的是MessageQueue.enqueueMessage()
boolean enqueueMessage(Message msg, long when) {
//如果没有target,表明没有handler报错
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//如果消息队列中,没有其他消息,则作为第一个消息直接插入头部
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
//开启循环,根据新插入消息的when,进行比较,如果比当前消息小,或者到达队尾,直接跳出循环
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//将p插入消息队列中
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
//唤醒系统处理消息
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
1.4 消息的取出
消息的调度由MessageQueue这个类来实现,同时这个类被Looper持有,MessageQueue不要被它的名字所骗了,不是队列,只是负责消息的入队和取出,这个类是线程私有的,通过ThreadLocal和当前线程绑定在一起,当前线程持有Looper,而Looper持有MessageQueue,通过Looper.loop()中的死循环,不断进行轮询,通过MessageQueue取出消息。在App启动时,系统已经帮我们在主线程开启了消息轮询,如果想在我们自己的子线程中开启消息轮询,则需要手动调用Looper.prepare() 和Looper.quit()
下面来看下系统是怎么帮我们在主线程中开启消息轮询的,在类ActivityThread中存在一个main方法,它就是我们App的入口
public static void main(String[] args) {
//....省略其他代码
Looper.prepareMainLooper();
thread.attach(false, startSeq);
//....省略其他代码
//Looper在主线程开启消息循环
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
跟踪Looper.prepareMainLooper()发现
@Deprecated
public static void prepareMainLooper() {
//创建一个不允许退出的消息循环
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
//并将Looper类中的sMainLooper 通过ThreadLocal获取到主线程的looper
sMainLooper = myLooper();
}
}
跟踪prepare(false),这里通过前面提到ThreadLocal来获取当前线程的Looper,如果当前线程已经创建了Looper,则抛出异常,每个线程最多只能存在一个Looper,
如果当前主线程没有创建Looper,则创建Looper,并以键值对的形式存储在ThreadLocal中
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//创建主线程的Looper,并存储在ThreadLocal中
sThreadLocal.set(new Looper(quitAllowed));
}
最后ActivityThread在做完其他准备工作之后,就会在调用Looper.loop()开启消息循环,从中拿出消息并进行处理。
消息处理的入口是Looper.loop()
public static void loop() {
//通过ThreadLocal 取出 looper
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
//如果Looper已经开始了循环,再次调用本方法可能使得排队的消息比正在处理的消息先执行
if (me.mInLoop) {
Slog.w(TAG, "Loop again would have the queued messages be executed"
+ " before this one completed.");
}
//设置Looper的状态为正在循环
me.mInLoop = true;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
// Allow overriding a threshold with a system prop. e.g.
// adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride =
SystemProperties.getInt("log.looper."
+ Process.myUid() + "."
+ Thread.currentThread().getName()
+ ".slow", 0);
me.mSlowDeliveryDetected = false;
//这里开启了死循环,如果loopOnce()返回false 则跳出循环,如果跳出循环了,是如何唤醒的
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}
点击loopOnce(),其主要作用是轮询并传递单个消息,当返回false,则让上面的循环跳出
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
//可能会堵塞,在这里调用MessageQueue的next()
Message msg = me.mQueue.next();
if (msg == null) {
// 没有消息代表消息队列正在退出
return false;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
//监测卡顿
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " "
+ msg.callback + ": " + msg.what);
}
//使用final 确保观察者在处理事务时不会改变
// Make sure the observer won't change while processing a transaction.
final Observer observer = sObserver;
final long traceTag = me.mTraceTag;
//消息分发延误阈值
long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
//消息处理延误阈值
long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
if (thresholdOverride > 0) {
slowDispatchThresholdMs = thresholdOverride;
slowDeliveryThresholdMs = thresholdOverride;
}
final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);
final boolean needStartTime = logSlowDelivery || logSlowDispatch;
final boolean needEndTime = logSlowDispatch;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
final long dispatchEnd;
Object token = null;
if (observer != null) {
token = observer.messageDispatchStarting();
}
long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
try {
//这里进行了回调,msg.target 就是你重写的Handler,
//在你sendMessage时,通过
msg.target.dispatchMessage(msg);
if (observer != null) {
observer.messageDispatched(token, msg);
}
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} catch (Exception exception) {
if (observer != null) {
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
ThreadLocalWorkSource.restore(origWorkSource);
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logSlowDelivery) {
if (me.mSlowDeliveryDetected) {
if ((dispatchStart - msg.when) <= 10) {
Slog.w(TAG, "Drained");
me.mSlowDeliveryDetected = false;
}
} else {
if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
msg)) {
// Once we write a slow delivery log, suppress until the queue drains.
me.mSlowDeliveryDetected = true;
}
}
}
if (logSlowDispatch) {
//记录分发延误的时间
showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
//回收消息
msg.recycleUnchecked();
return true;
}
跟踪进入MessageQueue.next()的方法
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//执行native层消息机制层,
//timeOutMillis参数为超时等待时间。如果为-1,则表示无限等待,直到有事件发生为止。
//如果值为0,则无需等待立即返回。该方法可能会阻塞
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//如果msg.target == null 则为消息屏障,直到找到下一个异步消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
// //获取消息,判断等待时间,如果还需要等待则等待相应时间后唤醒
if (msg != null) {
//判断当前消息时间,是不是比当前时间大,计算时间差
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 不需要等待时间或者等待时间已经到了,那么直接返回该消息
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// 没有更多消息了
nextPollTimeoutMillis = -1;
}
//处理完所有待处理的消息后,在处理退出消息
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
//如果第一次空闲,则获取要运行的空闲器的数量。
// 空闲句柄仅在队列为空或第一条消息时运行
// 在队列中(可能是一个障碍)将在未来被处理
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
1.5同步屏障
一般来说,Message 都是按照when 进行排列,使用next 指向下一个消息,按照when的大小进行排序,when越小,排在队列的前面
同步屏障就是在消息队列中插入了一个屏障,在屏障之后的所有普通消息都会被拦着,不能被处理,不过异步消息却不会被消息屏障所阻拦,可以处理,
因此可以认为,消息屏障是为了确保异步消息的优先级,设置了屏障之后,只能处理队列中的异步消息,同步消息会被挡住,除非撤销屏障
怎么添加同步屏障?
通过调用MessageQueue.postSyncBarrier()
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
//我们不需要唤醒队列,因为屏障的目的是阻止它
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
//获取屏障的唯一标识,标识从0开始,自加1
final int token = mNextBarrierToken++;
//从消息池中获取一个msg,设置消息为正在使用状态,并且重置msg的when和 arg1,arg1 的值设置为token,
//但是这里并没有给target赋值,所以msg的target是否为空,是判断这个msg是否屏障消息的标志
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
//指向消息队列中的第一消息
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
//这里进行插入,如果pre不是指向消息的头部,将消息插入队列中
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
//直接插入消息的头部
msg.next = p;
mMessages = msg;
}
return token;
}
}
消息屏障如何工作的?
Handler 对消息的处理,需要通过Looper.loop()不断执行MessageQueue.next()从中取出消息,在取出消息的时候,如果遇到消息屏障,则取出消息时会直接绕过普通消息,直接寻找消息队列中的下一个异步消息,从下面可以看到,msg.target == null 说明此时的消息是屏障消息,此时进入到循环,遍历移动msg的位置,直到移动到的msg 是异步message则退出循环,这也就意味着所有同步消息会被过滤掉
Handler中的消息分为:普通消息(同步消息)、异步消息、屏障消息)
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//msg.target == null 表明遇到消息屏障
if (msg != null && msg.target == null) {
//被一道屏障挡住了,寻找消息队列中下一个异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
//将消息从消息队列中移除
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
//....省略其他代码
}
}
移除同步屏障
删除屏障消息的方法比较简单,就是遍历消息链表,如果p.target == null 或者p.arg1 == token ,则说明p 是屏障消息,将p删除并且回收
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
// 循环遍历,直到遇到屏障消息时推退出循环
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
// 删除屏障消息p
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
1.6 IdleHandler
IdleHandler 是Handler机制提供的一种可以在Looper事件循环的过程中,当出现空闲时,允许我们执行任务的一种机制,它被定义在MessageQueue中,作为一个接口
// MessageQueue.java
public static interface IdleHandler {
boolean queueIdle();
}
需要实现queueIdle() 方法,同时返回值为true时表示一个持久的IdleHandler会被重复使用,返回false表示一个一次性的IdleHandler,MessageQueue类中提供了相应的add() 和remove(),其中mIdleHandler是一个arrayList
// MessageQueue.java,
public void addIdleHandler(@NonNull IdleHandler handler) {
// ...
synchronized (this) {
mIdleHandlers.add(handler);
}
}
public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}
mIdleHandler时的执行时机一般有两种情况,一种是MessageQueue为空,没有消息
第二种是有消息队列,但是没有到执行之间,message.when > currentTime,需要滞后执行
mIdleHandler的
Message next() {
// ...
int pendingIdleHandlerCount = -1;
int nextPollTimeoutMillis = 0;
for (;;) {
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// ...
if (msg != null) {
if (now < msg.when) {
// 计算休眠的时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Other code
// 找到消息处理后返回
return msg;
}
} else {
// 没有更多的消息
nextPollTimeoutMillis = -1;
}
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null;
boolean keep = false;
try {
//执行idler.queueIdle()中的回调方法
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
//如果是一次性的handler,直接移除
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
//通过重置pendingIdleHandlerCount 和nextPollTimeoutMillis ,避免再次进入
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
}
}
复制代码
大概流程是这样的:
- 如果本次循环拿到的Message为空,或者!这个Message是一个延时的消息而且还没到指定的触发时间,那么,就认定当前的队列为空闲状态,
- 接着就会遍历mPendingIdleHandlers数组(这个数组里面的元素每次都会到mIdleHandlers中去拿)来调用每一个IdleHandler实例的queueIdle方法,
- 如果这个方法返回false的话,那么这个实例就会从mIdleHandlers中移除,也就是当下次队列空闲的时候,不会继续回调它的queueIdle方法了。
处理完IdleHandler后会将nextPollTimeoutMillis设置为0,也就是不阻塞消息队列,当然要注意这里执行的代码同样不能太耗时,因为它是同步执行的,如果太耗时肯定会影响后面的message执行。
使用场景:
1.Activity启动优化(加快App启动速度):onCreate,onStart,onResume中耗时较短但非必要的代码可以放到IdleHandler中执行,减少启动时间
2.想要在一个View绘制完成之后添加其他依赖于这个View的View,当然这个用View#post()也能实现,区别就是前者会在消息队列空闲时执行
3.发送一个返回true的IdleHandler,在里面让某个View不停闪烁,这样当用户发呆时就可以诱导用户点击这个View,这也是种很酷的操作
4.一些第三方库中有使用,比如LeakCanary,Glide中有使用到,具体可以自行去查看
1.7 Handler的内存泄露
场景1:
public class MainActivity extends AppCompatActivity {
private Handler mHandler = new Handler(){
@Override
public void handleMessage(@NonNull Message msg) {
// 处理数据
}
};
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 后台耗时任务
threadWork();
}
private void threadWork() {
// 耗时任务
// ...
// 发送数据
Message message = Message.obtain();
mHandler.sendMessage(message);
}
}
//分析:当使用内部类(包括匿名类)来创建Handler的时候,Handler对象会隐式地持有一个外部类对象(通常是一个Activity)的引用。而Handler通常会伴随着一个耗时的后台线程(例如从网络拉取图片)一起出现,这个后台线程在任务执行完毕(例如图片下载完毕)之后,通过消息机制通知Handler,然后Handler把图片更新到界面。然而,如果用户在网络请求过程中关闭了Activity,正常情况下,Activity不再被使用,它就有可能在GC检查时被回收掉,但由于这时线程尚未执行完,而该线程持有Handler的引用(不然它怎么发消息给Handler?),这个Handler又持有Activity的引用,就导致该Activity无法被回收(即内存泄露),直到网络请求结束。
场景2:
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
}
},5000);
//最终会调用MessageQueue.enqueueMessage()
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// Message持有Handler的引用
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
//Message会持有一个对Handler的引用,当Handler为非静态内部类,会持有一个对Activity的引用,由于这个Message会长期存在队列中,就会导致Handler长期持有对Activity的引用,从而引起视图和资源泄露,
解决方法:
方式1:通过静态内部类+ 弱引用的方法
//使用静态内部类+弱引用的方法
private class MyHandler(val wrActivity:WeakReference<SecondActivity>):Handler(Looper.getMainLooper()){
override fun handleMessage(msg:Message){
super.handleMessage(msg)
wrActivity.get()?.run(){
when(msg.what){
WHAT->{
}
else ->{
}
}
}
}
}
方式2:及时回收Message
@Override
protected void onDestroy() {
super.onDestroy();
if (mHandler != null) {
mHandler.removeCallbacksAndMessages(null);
}
}
总结
Handler机制到这里已经完全串联起来了,整个流程最后总结起来其实确实很简单:
1、ActivityThread中,初始化Looper,Looper初始化时候会创建MessageQueue;Looper初始化完成之后调用loop()开启死循环,不断取出MessageQueue中的消息,并分发出去
2、Handler初始化后,将Looper,MessageQueue、Handler关联起来,并等待接受来自MessageQueue中的消息
3、Handler通过send相关方法,发送消息到MessageQueue,MessageQueue通过Message信息,将Message放到队列中相应位置,等待被取出使用;
4、Looper取出消息,并根据Message中的信息分发出去,给相应的Handler使用,此时Hanlder接收到消息,我们开始处理消息,进行更新UI等主线程的操作
参考文章
ThreadLocal,一篇文章就够了 - 知乎 (zhihu.com)
Java面试必问,ThreadLocal终极篇 - 简书 (jianshu.com)
https://juejin.cn/post/6844903916958662669
[(3条消息) IdleHandler 是什么?怎么使用,能解决什么问题?_一叶飘舟的博客-CSDN博客_idlehandler
[深入理解 Linux 的 epoll 机制 - 腾讯云开发者社区-腾讯云 (tencent.com)](https://blog.csdn.net/jdsjlzx/article/details/110532500)