1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- package com.chinacreator.process.job;
- import java.util.List;
- import java.util.Map;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.chinacreator.process.bean.MirrorBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KafkaDao;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KafkaMonitorJob {
-
- private Logger logger = Logger.getLogger(KafkaMonitorJob.class);
-
- @Autowired
- private DictionaryDao dictionaryDao;
-
- @Autowired
- private MessageService messageService;
-
- @Autowired
- private KafkaDao kafkaDao;
-
- public void doProcess() throws Exception {
- if(dictionaryDao.getValue("recivemq").equals("0")){
- logger.info("kafka监控开始启动");
- long beginTime = System.currentTimeMillis();
- List<MessagePipe> list = messageService.reciveBatchMessage("kafkamonitor", 500);
- logger.info("接收kafka监控队列花费时间:"+(System.currentTimeMillis()-beginTime));
- for (MessagePipe messagePipe : list) {
- Map<String, Object> body = messagePipe.getBody();
- String content = body.get("content").toString();
- logger.info(content);
- List<?> listMonitor = JsonUtil.jsonToList(content);
- for (Object mirrorBean : listMonitor) {
- logger.info(JsonUtil.objectToJson(mirrorBean));
- kafkaDao.save((MirrorBean)JsonUtil.jsonToBean(JsonUtil.objectToJson(mirrorBean), MirrorBean.class));
- }
- }
- }else{
- logger.info("kafka监控停止接收数据");
- }
- }
-
- public static void main(String[] args) {
- String str = " [{\"parttion\":\"0\",\"lastOffset\":\"17645076\",\"time\":\"20170623162234\"},{\"parttion\":\"1\",\"lastOffset\":\"17838541\",\"time\":\"20170623162234\"}]";
- List<?> listMonitor = JsonUtil.jsonToList(str);
- for (Object mirrorBean : listMonitor) {
- System.out.println(JsonUtil.objectToJson(mirrorBean));
- System.out.println(JsonUtil.objectToJson(JsonUtil.jsonToBean(JsonUtil.objectToJson(mirrorBean), MirrorBean.class)));
- }
- }
- }
|