a4c85ed3ed5b018f3bef9aa671f31c78b111bb25.svn-base 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package com.chinacreator.process.job;
  2. import java.util.List;
  3. import java.util.Map;
  4. import org.apache.log4j.Logger;
  5. import org.quartz.DisallowConcurrentExecution;
  6. import org.quartz.PersistJobDataAfterExecution;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import com.chinacreator.process.bean.MirrorBean;
  9. import com.chinacreator.process.dao.DictionaryDao;
  10. import com.chinacreator.process.dao.KafkaDao;
  11. import com.chinacreator.process.util.JsonUtil;
  12. import com.chinacreator.video.queue.MessageService;
  13. import com.chinacreator.video.queue.bean.MessagePipe;
  14. @PersistJobDataAfterExecution
  15. @DisallowConcurrentExecution
  16. public class KafkaMonitorJob {
  17. private Logger logger = Logger.getLogger(KafkaMonitorJob.class);
  18. @Autowired
  19. private DictionaryDao dictionaryDao;
  20. @Autowired
  21. private MessageService messageService;
  22. @Autowired
  23. private KafkaDao kafkaDao;
  24. public void doProcess() throws Exception {
  25. if(dictionaryDao.getValue("recivemq").equals("0")){
  26. logger.info("kafka监控开始启动");
  27. long beginTime = System.currentTimeMillis();
  28. List<MessagePipe> list = messageService.reciveBatchMessage("kafkamonitor", 500);
  29. logger.info("接收kafka监控队列花费时间:"+(System.currentTimeMillis()-beginTime));
  30. for (MessagePipe messagePipe : list) {
  31. Map<String, Object> body = messagePipe.getBody();
  32. String content = body.get("content").toString();
  33. logger.info(content);
  34. List<?> listMonitor = JsonUtil.jsonToList(content);
  35. for (Object mirrorBean : listMonitor) {
  36. logger.info(JsonUtil.objectToJson(mirrorBean));
  37. kafkaDao.save((MirrorBean)JsonUtil.jsonToBean(JsonUtil.objectToJson(mirrorBean), MirrorBean.class));
  38. }
  39. }
  40. }else{
  41. logger.info("kafka监控停止接收数据");
  42. }
  43. }
  44. public static void main(String[] args) {
  45. String str = " [{\"parttion\":\"0\",\"lastOffset\":\"17645076\",\"time\":\"20170623162234\"},{\"parttion\":\"1\",\"lastOffset\":\"17838541\",\"time\":\"20170623162234\"}]";
  46. List<?> listMonitor = JsonUtil.jsonToList(str);
  47. for (Object mirrorBean : listMonitor) {
  48. System.out.println(JsonUtil.objectToJson(mirrorBean));
  49. System.out.println(JsonUtil.objectToJson(JsonUtil.jsonToBean(JsonUtil.objectToJson(mirrorBean), MirrorBean.class)));
  50. }
  51. }
  52. }