package com.chinacreator.process.job; import java.util.List; import java.util.Map; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import com.chinacreator.process.bean.MirrorBean; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.KafkaDao; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; @PersistJobDataAfterExecution @DisallowConcurrentExecution public class KafkaMonitorJob { private Logger logger = Logger.getLogger(KafkaMonitorJob.class); @Autowired private DictionaryDao dictionaryDao; @Autowired private MessageService messageService; @Autowired private KafkaDao kafkaDao; public void doProcess() throws Exception { if(dictionaryDao.getValue("recivemq").equals("0")){ logger.info("kafka监控开始启动"); long beginTime = System.currentTimeMillis(); List list = messageService.reciveBatchMessage("kafkamonitor", 500); logger.info("接收kafka监控队列花费时间:"+(System.currentTimeMillis()-beginTime)); for (MessagePipe messagePipe : list) { Map body = messagePipe.getBody(); String content = body.get("content").toString(); logger.info(content); List listMonitor = JsonUtil.jsonToList(content); for (Object mirrorBean : listMonitor) { logger.info(JsonUtil.objectToJson(mirrorBean)); kafkaDao.save((MirrorBean)JsonUtil.jsonToBean(JsonUtil.objectToJson(mirrorBean), MirrorBean.class)); } } }else{ logger.info("kafka监控停止接收数据"); } } public static void main(String[] args) { String str = " [{\"parttion\":\"0\",\"lastOffset\":\"17645076\",\"time\":\"20170623162234\"},{\"parttion\":\"1\",\"lastOffset\":\"17838541\",\"time\":\"20170623162234\"}]"; List listMonitor = JsonUtil.jsonToList(str); for (Object mirrorBean : listMonitor) { System.out.println(JsonUtil.objectToJson(mirrorBean)); System.out.println(JsonUtil.objectToJson(JsonUtil.jsonToBean(JsonUtil.objectToJson(mirrorBean), MirrorBean.class))); } } }