`
darkjune
  • 浏览: 301296 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

SymmetricDS 2.2.5 undeploy时symmetricScheduler job线程杀不掉分析

阅读更多

SymmetricDS的同步机制是定时周期性同步,我们项目根据业务需要,只需要客户在UI激活一次replication时候才开始同步,所以这里我们的设计是每次激活时,用symmetric自己的deploy/undeploy机制让其在启动时候自动跑replication任务。

 

最近遇到一个问题, 在使用中,发现当数据同步时间间隔设在1小时以内时,运行一天以上会发现大量的SymmetricSchedule线程积压在JVM中没有释放,怀疑是Symmetric 2.2.5中在undeploy时释放线程有问题。

为了解决这个问题,先看下SymmetricDS中其需要的定时任务都定义在哪里。

 

在Symmetric engine中,整个symmetricDS的入口类在AbstractSymmetricEngine,在这个类的启动方法如下:

 

    public synchronized boolean start() {

        setup();
        
        if (isConfigured()) {
                  ...
                    getTriggerRouterService().syncTriggers();
                    heartbeat(false);
                    jobManager.startJobs();
                   ...
                    started = true;
                } finally {
                    starting = false;
                }
                
                return true;
            } else {
                return false;
            }
        } else {
           ...
        }
    }
 

 

可以看到其内置的job是由jobManager启动的,再看其实现类JobManager代码片段:

 

  private List<IJob> jobs;  

  public synchronized void startJobs() {
        for (IJob job : jobs) {
            if (job.isAutoStartConfigured()) {
                job.start();
            } else {
                log.info("JobNoAutoStart", job.getName());
            }
        }
    }
 可以看到是直接调用集合jobs中的job实例,该实例在SymmetricDS 2 中是由spring注入的,其spring xml配置如下:

 

 

    <bean id="jobManager" class="org.jumpmind.symmetric.job.JobManager">
        <property name="jobs">
            <list>
                <ref bean="job.routing" />
                <ref bean="job.push" />
                <ref bean="job.pull" />
                <ref bean="job.purge.outgoing" />
                <ref bean="job.purge.incoming" />
                <ref bean="job.purge.datagaps" />
                <ref bean="job.stat.flush" />
                <ref bean="job.synctriggers" />
                <ref bean="job.heartbeat" />
                <ref bean="job.watchdog" />
            </list>
        </property>
        <property name="taskScheduler" ref="symmetricScheduler" />
    </bean>
    <bean id="job.routing" parent="job.abstract" class="org.jumpmind.symmetric.job.RouterJob">
        <property name="routingService" ref="routingService" />
        <property name="autoStartParameterName" value="start.route.job" />
    </bean>
 以上就是其内置的job,支持周期性运行和定时运行,以便进行数据库同步。这些job就是在undeploy时候没有杀掉的线程,由于我们关注的是undeploy时这些job的卸载情况,回到AbstractSymmetricEngine:

 

 

private ThreadPoolTaskScheduler taskScheduler;    
public synchronized void stop() {
        ...
        jobManager.stopJobs();
        getRouterService().stop();
        started = false;
        starting = false;
    }
    public synchronized void destroy () {
        stopJobs();
    }
 回到JobManager:

 

 

    public synchronized void stopJobs() {
        for (IJob job : jobs) {
            job.stop();
        }      
    }
 

 

jobManager直接遍历每个job并调用其stop方法。

所有job都有个超类AbstractJob,这个类的stop方法如下:

 

    public boolean stop() {
        boolean success = false;
        if (this.scheduledJob != null) {
            success = this.scheduledJob.cancel(true);
            this.scheduledJob = null;
            if (success) {
                log.info("JobCancelled", jobName);
                started = false;                
            } else {
                log.warn("JobFailedToCancel", jobName);
            }
        }
        return success;
    }
 这里只是调用了线程的cancel方法,在我们实际使用中,只是cancel并没有终结掉这些线程,这是我们JVM中的thread dump:

 

 

"symmetricScheduler-14" prio=6 tid=0x49d7f800 nid=0x13b4 waiting on condition [0x4e2ef000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x1d6873b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)

 

找到这里,我们知道这是其释放线程的一个bug,所以我直接去找了一下symmetricDS的bug track里有没有解决,发现这是2.2.*中的一个bug,在2.4.0以后的版本中已经fix,其JIRA地址在:

Tomcat shutdown hangs when SymmetricDS is deployed as a war 写道
http://jira.codehaus.org/browse/SYMMETRICDS-466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel

 

这个fix主要是在JobManager中加入了调用spring的线程池的shutdow方法:

    public synchronized void destroy () {
        stopJobs();
        if (taskScheduler != null) {
            taskScheduler.shutdown();
        }
    }

 

至此,在项目中使用symmetricDS新的代码,undeploy后线程都被杀掉,重新deploy后没有多余的job线程运行。

 

我们的启动方式:

没有在配置文件中配SymmetricDS需要的.cron项,直接让其按照线程的scheduleWithFixedDelay方式运行。该任务模式的配置在symmetric.properties文件中,在symmetric源码(AbstractJob)中可以清晰的看到如何获取该配置文件的值:

this.cronExpression = parameterService.getString(jobName + ".cron", null);

 之后在该类的启动线程方法中会有判断:

    public void start() {
        if (this.scheduledJob == null) {
            log.info("JobStarting", jobName);
            if (!StringUtils.isBlank(cronExpression)) {
                this.scheduledJob = taskScheduler.schedule(this, new CronTrigger(cronExpression));
                started = true;
            } else {

                int startDelay = randomTimeSlot.getRandomValueSeededByExternalId();
                if (this.timeBetweenRunsInMs > 0) {
                    this.scheduledJob = taskScheduler.scheduleWithFixedDelay(this,
                            new Date(System.currentTimeMillis() + startDelay),
                            this.timeBetweenRunsInMs);
                    started = true;
                } else {
                    log.error("JobFailedToSchedule", jobName);
                }
            }
        }
    }

 

 

下面是正常后的JVM 线程截图:

 

 
  • 大小: 90.8 KB
分享到:
评论
2 楼 darkjune 2014-12-19  
他整个同步机制官方网站文档都有,同步时是在临时文件夹将需要发送的数据变成了csv文件。如果同时更新同一张表,有一个filter机制来放弃重复更新,filter规则支持自定义。
1 楼 yuanfangren123 2014-12-06  
您好,我目前只是配置成功了。我想问下这个软件他在同步的时候数据是如何传输的?有没有形成文件,通过文件来传输?还有就是他如何避免两表双向同步的时候触发器之间的来回触发问题?您能说下他的源码的大体流程吗?
您还有他的一些其他的相关资料吗?

相关推荐

Global site tag (gtag.js) - Google Analytics