概述
下面均使用并模拟了两个task2:一个在Future中抛出一次异常的task1(Future使用了onComplete)、一个在Future内外均各抛出一次异常的task2(Future使用了onComplete)
package io.growing.schedule
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Failure
/**
*
* @author liguobin@growingio.com
* @version 1.0,2019/9/17
*/
object UserTask {
//仅load内部抛出运行时异常
def task1(): Unit = {
println("我开始执行任务1")
load(1).map { x =>
println("任务1-我在执行更新数据 x: " + x)
}
}
private var j = 0
//load内部与外部各抛一次运行时异常
def task2() = {
println("我开始执行任务2")
load(2).map { x =>
println("任务2-我在执行更新数据 x: " + x)
}
j += 1
//模拟仅抛出一次异常
if (j == 1) {
throw new Exception("Future 外抛出的异常")
}
}
//@see https://docs.scala-lang.org/zh-cn/overviews/core/futures.html
//当Future带着某个值就位时,我们就说该Future携带计算结果成功就位。
//当Future因对应计算过程抛出异常而就绪,我们就说这个Future因该异常而失败。
//如果此Future以异常完成,那么新的Future也将包含此异常
private var i = 0
private def load(name: Int) = {
val f = Future {
i += 1
if (i < 2) {
println(s"任务$name-我正在抛出异常")
throw new Exception("Future 内抛出的异常")
}
1
}
f.onComplete {
case Failure(t) => println(s"任务$name-load方法捕获到错误了")
case _ =>
}
f
}
}
Java Timer
Timer是一个比较老的库,从Java1.3版本开始的,并且是线程安全的,其内部均使用synchronized
基本原理
- 其内部将任务TimerTask(Runnable)传递进优先级队列数据结构TaskQueue,其中优先级以下次被执行时间为值构造小根堆(nextExecutionTime,由fixUp、fixDown进行上下调整)。
- 执行schedule方法时,task的下次执行时间已经根据传进的delay+当前系统时间戳计算好。并将TimerTask入队,然后获取小根堆的根节点,如果是当前节点则直接将它唤醒(根节点表示离将被执行时间最近的任务,如果小根堆是当前任务,那么当前任务就是马上被执行的任务)。
- 初始化Timer对象时,会启动TimerThread线程,并将整个任务队列传进去,所有待调度的任务在队列中等待被执行。TimerThread这个相当于整个任务的调度线程。
- TimerThread线程的run方法会执行mainLoop方法,整个方法是一个死循环,不停的从队列取出可执行的任务,被取消的任务将从队列中删除。其中执行时还会判断当前时间戳与任务入队时的nextExecutionTime,只有在这个时间小于等于当前时间戳时,任务才会开始被调度,否则不会被处理并被要求开始wait。在这个处理过程中mainLoop方法只会处理中断异常InterruptedException,任何其他异常都将导致任务被取消,并退出mainLoop方法的死循环,任务调度线程直接结束。
Timer主要特性总结
- Timer 是基于绝对时间的,对系统时间比较敏感
- Timer 是内部是单一线程,调度线程挂了任务全挂
- Timer 运行多个 TimeTask 时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,无法在异常出现后继续恢复执行
- Timer 执行线程默认不是 daemon 线程, 任务执行完,主线程(或其他启动定时器的线程)结束时,task 线程并没有结束。需要注意潜在内存泄漏问题
- 内部使用了大量的synchronized和锁来保证线程安全性
- Timer内部优先级队列大小默认是128个,满时扩容为当前容量的2倍,并向上调整
代码示例1
object TimerTest3 extends App {
pull()
lazy val timer = new Timer()
//这里使用main方法测试,使用val不会被初始化,除非加lazy @see https://www.jianshu.com/p/f99e09effea4
def LongServerTask: TimerTask = new TimerTask {
override def run(): Unit = {
UserTask.task1()
}
}
def pull(): Unit = {
timer.schedule(LongServerTask, 3000, 3000)
timer.schedule(updateHotDataTask, 3000, 3000)
}
def updateHotDataTask: TimerTask = new TimerTask {
override def run(): Unit = {
UserTask.task2()
}
}
}
结果1
task1和task2的Future中的异常都被吃掉了,task2的异常由于Future外部没有onComplete捕获,会导致task被取消,所有定时任务直接结束,main方法停止。
Java ScheduledExecutorService
基于上述Timer的设计缺陷以及功能上的不足,不引入第三方库的前提下,可以使用现有Java类库的ScheduledExecutorService类。该类是一个接口,从Java1.5开始,继承自ExecutorService接口
我们有两种方法创建:
- 通过Executors的静态方法newScheduledThreadPool来创建自己的定时任务,只需传入核心线程数量、(ThreadFactory可选)
- 通过new ScheduledThreadPoolExecutor实例来创建,同样也只需传入核心线程数量、(ThreadFactory可选),甚至可以使用自定义的拒绝策略。实际上Executors仅对ExecutorService的子类作了封装,以便更快更简单创建一个线程池
ScheduledThreadPoolExecutor的创建
两方法的比较:线程池不推荐使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的人更加明确线程池的运行规则,规避资源耗尽的风险
- newFixedThreadPool和newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
- newCachedThreadPool和newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。 Scheduled线程池能按时间计划来执行任务,允许用户设定计划执行任务的时间,int类型的参数是设定 线程池中线程的最小数目。当任务较多时,线程池可能会自动创建更多的工作线程来执行任务
ScheduledThreadPoolExecutor的启动
- scheduleAtFixedRate 固定频率 下一次执行时间相当于是上一次的执行时间加上period
- scheduleWithFixedDelay 固定周期 下一次执行时间是上一次任务执行完的系统时间加上period,因而具体执行时间不是固定的,但周期是固定的,是采用相对固定的延迟来执行任务
ScheduledThreadPoolExecutor的优点
- 基于相对时间,即使是固定周期时也是基于纳秒的时间,比毫秒更加精确。
- 内部是个线程池,所以可以支持多个任务并发执行,Timer就一个任务调度线程
- 拥有拒绝策略,支持异常捕获后的处理,可自行扩展(只有通过execute提交的任务,才能将它抛出的异常交给UncaughtExceptionHandler,而通过submit提交的任务,无论是抛出的未检测异常还是已检查异常,都将被认为是任务返回状态的一部分,可以捕获Future.get)
- 使用可重入锁以及CAS而非synchronized关键字
代码示例2
package io.growing.schedule
import java.util.concurrent.{ Executors, _ }
/**
* @see https://www.jianshu.com/p/925dba9f5969
* @author liguobin@growingio.com
* @version 1.0,2019/9/16
*/
object ScheduledThreadPoolExecutorTest extends App {
pull()
lazy val delegate = Executors.defaultThreadFactory
lazy val tf = new ThreadFactory() {
override def newThread(r: Runnable): Thread = {
val res = delegate.newThread(r)
res.setUncaughtExceptionHandler((t: Thread, e: Throwable) => {
println("uncaughtException捕获到异常了")
e.printStackTrace()
})
res
}
}
lazy val executor = Executors.newScheduledThreadPool(5, tf)
lazy val task1 = new Runnable {
override def run(): Unit = {
UserTask.task1()
}
}
lazy val task2 = new Runnable {
override def run(): Unit = {
UserTask.task2()
}
}
def pull(): Unit = {
executor.scheduleWithFixedDelay(task1, 6, 6, TimeUnit.SECONDS)
executor.scheduleWithFixedDelay(task2, 6, 6, TimeUnit.SECONDS)
}
}
结果2
task1和task2的Future中的异常都被吃掉了,任务继续执行,task2的异常由于Future外部没有onComplete捕获,但不影响task1的执行,task1仍能继续工作,但task2无法被继续执行。基于此种方法更新缓存,需要自己实现重试策略。
Akka Actor scheduler
其原理使用是系统actor的scheduler调度器定时发送消息给其他actor以触发定时任务的执行。同样,akka定时任务也执行传入两个时间、第一次延迟时间和后续间隔时间,以及消息接受者和消息内容。
Akka2.5.3的官方文档:https://doc.akka.io/docs/akka/2.5.3/scala/scheduler.html,文档讲的很清楚,就不复制了。
代码示例3
package io.growing.schedule
import java.util.concurrent.TimeUnit
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
/**
*
* @author liguobin@growingio.com
* @version 1.0,2019/9/17
*/
class JobScheduler(jobActor: ActorRef, system: ActorSystem)(implicit ec: ExecutionContext) {
def start() {
//两个定时任务,task1,Future内部异常被捕获
// task2,Future内部异常被捕获,且Future外还抛出异常,两个task的异常都只会出现一次。
system.scheduler.schedule(0 seconds, Duration.create(1, TimeUnit.SECONDS), jobActor, "task1")
system.scheduler.schedule(0 seconds, Duration.create(3, TimeUnit.SECONDS), jobActor, "task2")
}
}
class JobActor extends Actor {
def receive = {
case "task1" => {
println("receive message from task1...")
UserTask.task1()
}
case "task2" => {
println("receive message from task2...")
UserTask.task2()
}
}
}
object TestJobScheduler extends App {
implicit val ec = ExecutionContext.global
val system = ActorSystem.apply("schedulerJob")
val jobActor = system.actorOf(Props[JobActor])
val schedule = new JobScheduler(jobActor, system)
schedule.start()
}
结果3
Future内抛出的异常被onComplete吃掉了,虽然外部的异常使得akka出错,但是下次任务仍会继续执行,符合需求预期。另外:错误次数呢?
补充
值得一提的是,根据官方文档说明,使用Runnable的定时任务在遇到异常时同样会取消当前task,与ScheduledThreadPoolExecutor表现一致
参见官方文档:https://doc.akka.io/docs/akka/2.5.3/scala/scheduler.html
下面这个定时任务实现方式,一旦task2中抛出未被捕获的异常,整个schedule就不会再被调用,定时任务中止。
//If the `Runnable` throws an exception the repeated scheduling is aborted,
//i.e. the function will not be invoked any more.
system.scheduler.schedule(0 seconds, Duration.create(3, TimeUnit.SECONDS), new Runnable {
override def run(): Unit = {
UserTask.task2()
}
})