java实时监听日志写入kafka

摘要:
目的实时监控目录下的日志文件。如果将任何新文件切换到新文件,则将其同步写入kafka,同时记录日志文件的行位置,以应对进程的异常退出。它可以从最后一个文件位置读取(考虑到效率,每100个条目记录一次,可以调整)源代码:[java]viewplaincopyimportjava。io。缓冲读取器;importjava.io。BufferedWriter;importjava.io。文件小鬼
目的
实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)
 
源码:
[java] view plain copy
 
  1. import java.io.BufferedReader;  
  2. import java.io.BufferedWriter;  
  3. import java.io.File;  
  4. import java.io.FileInputStream;  
  5. import java.io.FileNotFoundException;  
  6. import java.io.FileReader;  
  7. import java.io.FileWriter;  
  8. import java.io.IOException;  
  9. import java.io.LineNumberReader;  
  10. import java.io.PrintWriter;  
  11. import java.io.RandomAccessFile;  
  12. import java.net.NoRouteToHostException;  
  13. import java.util.ArrayList;    
  14. import java.util.Collection;    
  15. import java.util.List;    
  16. import java.util.Properties;    
  17. import java.util.Random;  
  18. import java.util.concurrent.Executors;  
  19. import java.util.concurrent.ScheduledExecutorService;  
  20. import java.util.concurrent.TimeUnit;  
  21.   
  22.   
  23.     
  24. import kafka.javaapi.producer.Producer;    
  25. import kafka.producer.KeyedMessage;    
  26. import kafka.producer.ProducerConfig;    
  27.   
  28.   
  29. /* 
  30.  * 自己在源服务器写生产者往kafka插入数据,注意文件"producer.properties放在linux下该jar文件同一目录 
  31.  * 监听某个目录下的文件数据然后写入kafka 
  32.  * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 & 
  33.  *  
  34.  *  
  35.  */  
  36. public class PortalLogTail_Line {    
  37.     
  38.     private Producer<String,String> inner;    
  39.     java.util.Random ran = new Random();  
  40.     public PortalLogTail_Line() throws FileNotFoundException, IOException {    
  41.         Properties properties = new Properties();    
  42.      //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));    
  43.         
  44.         properties.load(new FileInputStream("producer.properties"));    
  45.          
  46.         ProducerConfig config = new ProducerConfig(properties);   
  47.         
  48.         inner = new Producer<String, String>(config);    
  49.        
  50.     }    
  51.     
  52.         
  53.     public void send(String topicName,String message) {    
  54.         if(topicName == null || message == null){    
  55.             return;    
  56.         }    
  57.      //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);    
  58.         //随机作为key,hash分散到各个分区  
  59.       KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);    
  60.      //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);  
  61.         inner.send(km);    
  62.           
  63.     }    
  64.         
  65.     public void send(String topicName,Collection<String> messages) {    
  66.         if(topicName == null || messages == null){    
  67.             return;    
  68.         }    
  69.         if(messages.isEmpty()){    
  70.             return;    
  71.         }    
  72.         List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();    
  73.         for(String entry : messages){    
  74.             KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);    
  75.             kms.add(km);    
  76.         }    
  77.         inner.send(kms);    
  78.     }    
  79.         
  80.     public void close(){    
  81.         inner.close();    
  82.     }    
  83.   
  84.       
  85.       
  86.     public String getNewFile(File file)  
  87.     {  
  88.         File[] fs=file.listFiles();  
  89.         long maxtime=0;  
  90.         String newfilename="";  
  91.         for (int i=0;i<fs.length;i++)  
  92.         {  
  93.             if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access"))  
  94.             {  
  95.                 maxtime=fs[i].lastModified();  
  96.                 newfilename=fs[i].getAbsolutePath();  
  97.                   
  98.             }  
  99.         }  
  100.         return newfilename;  
  101.     }  
  102.     //写入文件名及行号  
  103.     public void writePosition(String path,int rn,String positionpath)  
  104.     {  
  105.         try {  
  106.                BufferedWriter out = new BufferedWriter(new FileWriter(positionpath));  
  107.                out.write(path+","+rn);  
  108.                out.close();  
  109.         } catch (IOException e) {  
  110.         }  
  111.     }  
  112.     LineNumberReader randomFile=null;  
  113.      String newfile=null;  
  114.      String thisfile=null;  
  115.      String prefile=null;  
  116.      int ln=0;  
  117.      int beginln=0;  
  118.     public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{       
  119.         
  120.         //启动一个线程每1秒钟读取新增的日志信息       
  121.        new Thread(new Runnable(){       
  122.             public void run() {       
  123.                 thisfile=getNewFile(file);  
  124.                prefile=thisfile;  
  125.                 //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件  
  126.                 try {  
  127.                     BufferedReader br=new BufferedReader(new FileReader(positionpath));  
  128.                     String line=br.readLine();  
  129.                     if (line!=null &&line.contains(","))  
  130.                     {  
  131.                         thisfile=line.split(",")[0];  
  132.                          prefile=thisfile;  
  133.                          beginln=Integer.parseInt(line.split(",")[1]);  
  134.                     }  
  135.                       
  136.                       
  137.                 } catch (FileNotFoundException e2) {  
  138.                     // TODO Auto-generated catch block  
  139.                     e2.printStackTrace();  
  140.                 }  
  141.                  catch (IOException e2) {  
  142.                         // TODO Auto-generated catch block  
  143.                         e2.printStackTrace();  
  144.                     }  
  145.                   
  146.                  //指定文件可读可写       
  147.                      try {  
  148.                         randomFile = new LineNumberReader(new FileReader(thisfile));  
  149.                     } catch (FileNotFoundException e) {  
  150.                         // TODO Auto-generated catch block  
  151.                         e.printStackTrace();  
  152.                     }       
  153.               while (true)  
  154.               {  
  155.                  try {  
  156.                     Thread.sleep(100);  
  157.                       
  158.                 } catch (InterruptedException e1) {  
  159.                     // TODO Auto-generated catch block  
  160.                     e1.printStackTrace();  
  161.                 }  
  162.                  try {       
  163.                       //获得变化部分的       
  164.                     //  randomFile.seek(lastTimeFileSize);       
  165.                       String tmp = "";       
  166.                       while( (tmp = randomFile.readLine())!= null) {    
  167.                          int currln=randomFile.getLineNumber();  
  168.                          //beginln默认为0  
  169.                          if (currln>beginln)  
  170.                              send(topicname,new String(tmp.getBytes("utf8")));  
  171.                             
  172.                           ln++;  
  173.                             
  174.                           //每发生一条写一次影响效率,连续发100次后再记录位置  
  175.                           if (ln>100)  
  176.                              {  
  177.                              writePosition(thisfile,currln,positionpath);  
  178.                              ln=0;  
  179.                              }  
  180.                        
  181.                       }     
  182.                      thisfile=getNewFile(file);  
  183.                      if(!thisfile.equals(prefile))  
  184.                        
  185.                      {  
  186.                         randomFile.close();  
  187.                        randomFile = new LineNumberReader(new FileReader(thisfile));  
  188.                       prefile=thisfile;  
  189.                      beginln=0;  
  190.                      }  
  191.                         
  192.                        
  193.                   } catch (IOException e) {       
  194.                       throw new RuntimeException(e);       
  195.                   }       
  196.               }  
  197.         }}).start();       
  198.     }       
  199.         
  200.     /**  
  201.      * @param args  
  202.      * @throws Exception  
  203.      */    
  204.     public static void main(String[] args) throws Exception {    
  205.         PortalLogTail_Line producer = new PortalLogTail_Line();     
  206.         if (args.length!=3)  
  207.         {  
  208.             System.out.println("usage:topicname pathname positionpath");  
  209.             System.exit(1);  
  210.         }  
  211.         String topicname=args[0];  
  212.         String pathname=args[1];  
  213.         String positionpath=args[2];   
  214.         final File tmpLogFile = new File(pathname);  
  215.         producer.realtimeShowLog(tmpLogFile,topicname,positionpath);   
  216.           
  217.      
  218.     
  219.     }    
  220.     
  221. }   
 
producer.properties文件放在同级目录下
[html] view plain copy
 
  1. metadata.broker.list=xxx:10909,xxx:10909  
  2.   
  3. # name of the partitioner class for partitioning events; default partition spreads data randomly  
  4. #partitioner.class=  
  5.   
  6. # specifies whether the messages are sent asynchronously (async) or synchronously (sync)  
  7. producer.type=sync  
  8. #producer.type=async  
  9.   
  10. # specify the compression codec for all data generated: none , gzip, snappy.  
  11. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally  
  12. compression.codec=none  
  13. #compression.codec=gzip  
  14.   
  15. # message encoder  
  16. serializer.class=kafka.serializer.StringEncoder  


 
测试 最后执行:
[java] view plain copy
 
  1. nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
  2. 转:http://blog.csdn.net/u011750989/article/details/21237251

免责声明:文章转载自《java实时监听日志写入kafka》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇nodejs qr-image 生成二维码sql server 触发器下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Java上传图片到Ftp,包含上传后文件大小为0的问题和Properties配置文件的读取

准备工作:需要使用coomos-net jar包。下载地址 一、 上传图片到FTP,文件大小为0的问题,解决:将ftp模式修改为Passive模式就可以了。 //将ftp模式修改为Passive模式 ftpClient.enterLocalPassiveMode(); 二、配置文件的操作,具体介绍请看 Java中Properties类的用法总结 1.使用...

java 调用apache.commons.codec的包简单实现MD5加密

转自:https://blog.csdn.net/mmd1234520/article/details/70210002/ 1 importjava.security.MessageDigest; 2 importjava.security.NoSuchAlgorithmException; 3 4 import org.apache....

c# 通过程序修改hosts文件

1 根据ip替换 var OSInfo =Environment.OSVersion; string pathpart = "hosts"; if (OSInfo.Platform ==PlatformID.Win32NT) { //is windows NT pathpart = "system32\drivers\etc\hosts";...

EXCEL导出,列表宽度大于内容列

//TIPS:列宽设置在最后 public static string Export<T>(string rootPath, List<T> list, string fileName, IDictionary<string, string> header, Action<int, string, string,...

一款不错的多选下拉列表利器—— Ext.ux.form.SuperBoxSelect

       在B/S系统中,下拉列表(select/dropdownlist/combobox)的应用随处可见,为了增强用户体验,开发人员也常常会做一些带联想功能的下拉列表,         特别是数据项比较多的时候,用户筛选起来就会很容易。         如果考虑多选的场景,我想以下的实现方式很多时候是能够满足要求的: 带复选框(checkbox)...

多线程使用注意

命名 来源:https://www.cnblogs.com/guozp/p/10344446.html 我们在创建线程池的时候,一定要给线程池名字,如下这种写法,线程是默认直接生成的: public static void main(String[] args) { ExecutorService executorService = E...