前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Android 线程与消息机制源码分析

Android 线程与消息机制源码分析

作者头像
Yif
发布2019-12-26 14:48:05
3730
发布2019-12-26 14:48:05
举报
文章被收录于专栏:Android 进阶Android 进阶

1.handler、looper、messagequeue 之间关系

messagequeue是用来存储消息的载体,而lopper是无限循环查找这个载体中是否有消息, handler是创建消息并使用lopper来构建消息循环。 handler主要任务是将任务切换到指定线程中执行

2.handler工作原理

创建handler的工作线程中必须要有Looper并调用Looper.prepare,而主线程ActivityThread初始化默认创建Looper,它方法中调用了 Looper.prepareMainLooper()就无需在调用Loopr.prepare去创建Looper;handler通过post传递runnable或者send发送消息,消息传递给messagequeueequeueMessage方法中,而looper是一个无限循环从这个消息队列中取出消息,取出来消息后,消息中的runnnable或者handler中的handlerMessage方法就会调用,注意looper运行或者处理消息必须在创建handler的线程中。

代码语言:javascript
复制
public static void prepareMainLooper() {
    prepare(false);
    synchronized (Looper.class) {
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper();
    }
}
private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
//调用ThreadLocal来获取每个线程的Looper,ThreadLocal在每个线程中存储数据,这些数据都互相不干扰
    sThreadLocal.set(new Looper(quitAllowed));
}

所以就可以在主线程中之间创建并初始化Handler而在子线程中需要

代码语言:javascript
复制
class LooperThread extends Thread {
*      public Handler mHandler;
*
*      public void run() {//创建looper
*          Looper.prepare();
*
*          mHandler = new Handler() {
*              public void handleMessage(Message msg) {
*                  // process incoming messages here
*              }
*          };
*//开启消息循环
*          Looper.loop();
*      }
*  }

3.ThreadLocal

ThreadLocal是用来在指定线程中存储数据,保证一个数据在不同线程中都保持一个独立的值互不干扰。每一个线程单独持有,因为每一个线程都有独立的变量副本,其他线程不能访问,不存在线程安全问题,也不会影响线程执行效率。ThreadLocal对象通常是由private static修饰,因为都需要复制进入本地线程。需要注意,ThreadLocal无法解决共享对象的更新问题。 通过 ThreadLocal 创建的线程变量,其子线程是无法继承的。也就是说你在线程中通过 ThreadLocal 创建了线程变量 V,而后该线程创建了子线程,你在子线程中是无法通过 ThreadLocal 来访问父线程的线程变量 V 的。如果你需要子线程继承父线程的线程变量,那该怎么办呢?其实很简单,Java 提供了 InheritableThreadLocal 来支持这种特性,InheritableThreadLocalThreadLocal 子类,所以用法和 ThreadLocal 相同,但是不建议在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。 ThreadLocal有个静态内部类叫ThreadLocalMap,它还有一个静态内部类叫Entry,它继承自weakreference,没有方法,只有一个value成员变量,它的keyThreadLocal 它们之间关系如下:

  1. 1个Thread有且仅有一个ThreadLocalMap对象
  2. 1个Entry对象的key弱引用指向1个ThreadLocal对象
  3. 1个ThreadLocalMap对象存储多个Entry对象
  4. 1个ThreadLocal对象可以被多个线程所共享
  5. ThreadLocal对象不持有value,value由线程Entry对象持有,是被Entry强引用,即使value生命周期结束了,也无法被回收,会导致内存泄漏

3.1 线程使用ThreadLocal有三个重要方法:

  1. set():如果没有set操作ThreadLocal,那么容易引起脏数据问题
  2. get():始终没有get操作ThreadLocal对象是没有意义的
  3. remove():如果没有该操作,容易引起内存泄漏,因为创建ThreadLocal用的是static final进行修饰

3.2 ThreadLocal主要两个问题是产生脏数据与内存泄漏**

  1. 脏数据:线程复用会产生脏数据,由于线程池会重用thread对象,那么与thread绑定的类静态属性ThreadLocal变量也会被重用。如果在实现线程run方法体中不显示调用remove清理与线程相关的ThreadLocal信息,那么下一个线程不调用set设置初始值,就可能get到重用的线程信息,包括ThreadLocal关联线程所对应的value值。
  2. 内存泄漏:使用static方法修饰,寄希望ThreadLocal对象失去引用,触发弱引用机制回收Entryvalue不现实。 综上解决这两个问题方法就是每次使用完ThreadLocal就及时调用remove方法清理即可
代码语言:javascript
复制
private static final ThreadLocal<Integer> mIntegerThreadLocal = new ThreadLocal<Integer>();
 9         mIntegerThreadLocal.set(0);
10         //输出结果为main thread threadlocal==0
11         System.out.println("main thread threadlocal==" + mIntegerThreadLocal.get());
12 
13         new Thread("Thread 1") {
14             @Override
15             public void run() {
16                 mIntegerThreadLocal.set(0);
17                 mIntegerThreadLocal.set(mIntegerThreadLocal.get() + 2);
18                 //输出结果为 Thread 1 threadlocal==2
19                 System.out.println("Thread 1 threadlocal==" + mIntegerThreadLocal.get());
20             }
21         }.start();
22 
23         new Thread("Thread 2") {
24             @Override
25             public void run() {
26                 //这里就会报空指针错误了 因为mIntegerThreadLocal 在这个线程中没有初始化他的值所以是null pointer
27                 mIntegerThreadLocal.set(mIntegerThreadLocal.get() + 2);
28                 System.out.println("Thread 1 threadlocal==" + mIntegerThreadLocal.get());
29             }
30         }.start();

源码分析:

代码语言:javascript
复制
public T get() {
    Thread t = Thread.currentThread();
//获取当前线程中的 threadLocals
    ThreadLocalMap map = getMap(t);
//如果有值,就直接获取对应的value
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
//否则就返回一个初始化的值,这个值为空
    return setInitialValue();
}
private T setInitialValue() {
T value = initialValue();    
Thread t = Thread.currentThread();    
ThreadLocalMap map = getMap(t);    
if (map != null)map.set(this, value);    
else createMap(t, value);    
return value;
}
protected T initialValue() {
return null;
}
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
//判断当前线程中的threadlocals是否为空,不为空设置值,否则创建threadlocalMap
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}
/**
 * Create the map associated with a ThreadLocal. Overridden in
 * InheritableThreadLocal.
 *
 * @param t the current thread
 * @param firstValue value for the initial entry of the map
 */
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}
 
**从源码得出`ThreadLocal`的`set`与`get`方法就是以当前线程为`key`获取对应的`value`值。**

4.MessageQueue源码分析

equeueMessage对消息队列进行插入插入,内部是一个简单的单链表实现

代码语言:javascript
复制
boolean enqueueMessage(Message msg, long when) {    
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");    
}    
if (msg.isInUse()) {  
throw new IllegalStateException(msg + " This message is already in use.");   
}    
synchronized (this) {        
if (mQuitting) {           
IllegalStateException e = new IllegalStateException(                    msg.target + " sending message to a Handler on a dead thread");            Log.w(TAG, e.getMessage(), e);           
msg.recycle();            
return false;        
}        
msg.markInUse();        
msg.when = when;//mMessages单链表表头        
Message p = mMessages;       
boolean needWake;//第一个往里面插入消息,表头就变成第一个传进来的消息        
if (p == null || when == 0 || when < p.when) {// New head, wake up the event queue if blocked.           
msg.next = p;            
mMessages = msg;            
needWake = mBlocked;        } 
else {                
needWake = mBlocked && p.target == null && msg.isAsynchronous();           Message prev;            
for (;;) {              
prev = p;              
p = p.next;               
if (p == null || when < p.when) {                
break;                
}               
if (needWake && p.isAsynchronous()) {                   
needWake = false;              
}            
}           
msg.next = p; // invariant: p == prev.next           
prev.next = msg;        
}        
// We can assume mPtr != 0 because mQuitting is false.        
if (needWake) {           
nativeWake(mPtr);       
}   
}   
return true;
}
//next就是从消息队列中取出消息然后删除它**
Message next() {
    // Return here if the message loop has already quit and been disposed.
    // This can happen if the application tries to restart a looper after quit
    // which is not supported.
//next方法返回为空的条件是looper调用quit或quitsafely方法退出才会返回空
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
//只有消息队列中没有消息它才会退出,否则会一直阻塞在这
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        nativePollOnce(ptr, nextPollTimeoutMillis);
        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }
 
            // Process the quit message now that all pending messages have been handled.
            if (mQuitting) {
                dispose();
                return null;
            }
 
            // If first time idle, then get the number of idlers to run.
            // Idle handles only run if the queue is empty or if the first message
            // in the queue (possibly a barrier) is due to be handled in the future.
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                pendingIdleHandlerCount = mIdleHandlers.size();
            }
            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
                mBlocked = true;
                continue;
            }
 
            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }
 
        // Run the idle handlers.
        // We only ever reach this code block during the first iteration.
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // release the reference to the handler
 
            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }
 
            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }
 
        // Reset the idle handler count to 0 so we do not run them again.
        pendingIdleHandlerCount = 0;
 
        // While calling an idle handler, a new message could have been delivered
        // so go back and look again for a pending message without waiting.
        nextPollTimeoutMillis = 0;
    }
}

5.Looper源码分析

代码语言:javascript
复制
/**
 * Run the message queue in this thread. Be sure to call
 * {@link #quit()} to end the loop.
 */
public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;
 
    // Make sure the identity of this thread is that of the local process,
    // and keep track of what that identity token actually is.
    Binder.clearCallingIdentity();
    final long ident = Binder.clearCallingIdentity();
 
    for (;;) {
//死循环,跳出条件是msg为空,即next()方法返回为空,而next()方法返回为空的唯一条件是Looper调用quit或者
quitSafely方法,Looper必须手动退出,否则Looper会无限循环下去。
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }
 
        // This must be in a local variable, in case a UI event sets the logger
        final Printer logging = me.mLogging;
        if (logging != null) {
            logging.println(">>>>> Dispatching to " + msg.target + " " +
                    msg.callback + ": " + msg.what);
        }
 
        final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
 
        final long traceTag = me.mTraceTag;
        if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
            Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
        }
        final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
        final long end;
        try {
            msg.target.dispatchMessage(msg);
            end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
        if (slowDispatchThresholdMs > 0) {
            final long time = end - start;
            if (time > slowDispatchThresholdMs) {
                Slog.w(TAG, "Dispatch took " + time + "ms on "
                        + Thread.currentThread().getName() + ", h=" +
                        msg.target + " cb=" + msg.callback + " msg=" + msg.what);
            }
        }
 
        if (logging != null) {
            logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
        }
 
        // Make sure that during the course of dispatching the
        // identity of the thread wasn't corrupted.
        final long newIdent = Binder.clearCallingIdentity();
        if (ident != newIdent) {
            Log.wtf(TAG, "Thread identity changed from 0x"
                    + Long.toHexString(ident) + " to 0x"
                    + Long.toHexString(newIdent) + " while dispatching to "
                    + msg.target.getClass().getName() + " "
                    + msg.callback + " what=" + msg.what);
        }
 
        msg.recycleUnchecked();
    }
}

Looper的quit方法

代码语言:javascript
复制
public void quit() {
//调用的是MessageQueue中的quit方法
    mQueue.quit(false);
}

MessageQueue的quit方法

代码语言:javascript
复制
void quit(boolean safe) {   
if (!mQuitAllowed) {        
throw new IllegalStateException("Main thread not allowed to quit.");   
}    
synchronized (this) {      
if (mQuitting) {          
return;        
}       
mQuitting = true;       //根据是否安全退出调用不同的退出方法       
if (safe) {          
removeAllFutureMessagesLocked();       
} else {           
removeAllMessagesLocked();      
}        
// We can assume mPtr != 0 because mQuitting was previously false
nativeWake(mPtr);    
}
}
 
**Looper的quitSafely方法**
​```java
public void quitSafely() {
    mQueue.quit(true);
}

6.使用Looper需要注意什么

不需要Looper时,必须调用quit或者quitSafely方法终止Looper循环,否则会造成内存泄漏会其它问题。

7.thread与handlerThread区别

区别在与run方法中,threadrun方法就是开启一个耗时任务,而handlerthread中开启消息队列

代码语言:javascript
复制
@Override
public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
        mLooper = Looper.myLooper();
        notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
//开启消息队列循环,不用时需要quit
    Looper.loop();
    mTid = -1;
}

8.Android中为什么主线程不会因为Looper.loop()里的死循环卡死?

handler机制内部采用Linuxpipe/epoll机制实现是一种IO多路复用机制 主线程没有消息即阻塞在管道读端,处于休眠状态,如果消息队列中有消息,就会往管道写端写入内容,唤醒正在等待消息的主线程 binder线程会往主线程消息队列里添加消息,然后往管道写端写一个字节,这样就能唤醒主线程从管道读端返回,也就是说queue.next()会调用返回。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年7月16日 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.handler、looper、messagequeue 之间关系
  • 2.handler工作原理
  • 3.ThreadLocal
    • 3.1 线程使用ThreadLocal有三个重要方法:
      • 3.2 ThreadLocal主要两个问题是产生脏数据与内存泄漏**
      • 4.MessageQueue源码分析
      • 5.Looper源码分析
      • 6.使用Looper需要注意什么
      • 7.thread与handlerThread区别
      • 8.Android中为什么主线程不会因为Looper.loop()里的死循环卡死?
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档