大数据离线分析平台 JavaSDK数据收集引擎编写

摘要:
importjava.util.concurrent.LinkedBlockingQueue;否则,它将返回false(参数异常&=null&&=null&aamp!&map=newHashMap&lt,版本);returntrue;

JavaSDK设计规则

 

 JavaSDK提供两个事件触发方法,分别为onChargeSuccess和onChargeRefund。我们在java sdk中通过一个单独的线程来发送线程数据,这样可以减少对业务系统的延时性。

SDK测试

  启动集群上的hdfs+nginx+flume进程,通过模拟数据的发送然后将数据发送到nginx服务器中,查看最终是否在hdfs中有数据的写入。

命令:

   start-dfs.sh: 启动hdfs命令

   su root:切换用户

   service nginx restart: 启动nginx进程

   启动flume进程:

       进入flume安装根目录,执行命令:


flume-ng agent --conf ./conf/ --conf-file ./conf/test2.conf --name agent &


 工程目录结构

大数据离线分析平台 JavaSDK数据收集引擎编写第1张

AnalyticsEngineSDK如下:
package com.kk.ae.sdk;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * 分析引擎sdk java服务器数据收集
 * */
public class AnalyticsEngineSDK {
    
    //日志记录对象
    private static final Logger log=Logger.getGlobal();
    //请求url的主体部分
    public static final String accessUrl="http://hadoop-001:8090/kkImg.gif";
    public static final String platformName="java_server";
    public static final String sdkName="jdk";
    private static final String version = "1";
    /**
     * 触发订单支付成功事件,发送事件数据到服务器
     * 
     * @param orderId
     *            订单支付id
     * @param memberIdd
     *            订单支付会员id
     * @return 如果发送数据成功(加入到发送队列中),那么返回true;否则返回false(参数异常&添加到发送队列失败).
     * @throws InterruptedException 
     */
    public static boolean chargeSuccess(String orderId,String memberId) throws InterruptedException {
        
        if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
            Map<String, String> map=new HashMap<String,String>();
            map.put("u_mid", memberId);
            map.put("oid", orderId);
            map.put("c_time", String.valueOf(System.currentTimeMillis()));
            map.put("ver", version);
            map.put("en", "e_cs");
            map.put("p1", platformName);
            map.put("sdk", sdkName);
            
            //创建url
            String url= buildUrl(map);
            // 发送url&将url加入到队列
            SendDataMonitor.addSendUrl(url);
            System.out.println(url);
            return true;
        } else {
            log.log(Level.WARNING, "订单id和会员id不能为空");
            return false;
        }
        
    }
    /**
     * 触发订单退款事件,发送退款数据到服务器
     * 
     * @param orderId
     *            退款订单id
     * @param memberIdd
     *            退款会员id
     * @return 如果发送数据成功,返回true。否则返回false。
     * @throws InterruptedException 
     */
    public static boolean chargeRefund(String orderId,String memberId) throws InterruptedException {
        if (orderId!=null&&!orderId.isEmpty()&&memberId!=null&&!memberId.isEmpty()) {
            Map<String, String> map=new HashMap<String,String>();
            map.put("u_mid", memberId);
            map.put("oid", orderId);
            map.put("c_time", String.valueOf(System.currentTimeMillis()));
            map.put("ver", version);
            map.put("en", "e_cr");
            map.put("p1", platformName);
            map.put("sdk", sdkName);
            
            //创建url
            String url= buildUrl(map);
            // 发送url&将url加入到队列
            SendDataMonitor.addSendUrl(url);
            System.out.println(url);
            return true;
        } else {
            log.log(Level.WARNING, "订单id和会员id不能为空");
            return false;
        }
        
    }
    private static String buildUrl(Map<String, String> map) {
        
        StringBuffer stringBuffer=new StringBuffer();
        stringBuffer.append(accessUrl).append("?");
        for(Map.Entry<String, String> entry:map.entrySet()) {
            if (entry.getKey()!=null&&!entry.getKey().isEmpty()&&entry.getValue()!=null&&!entry.getValue().isEmpty()) {
                {
                    try {
                        stringBuffer.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(),"utf-8")).append("&");
                    } catch (UnsupportedEncodingException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                }
            }
            
        }
        return stringBuffer.substring(0, stringBuffer.length() - 1);
    }
        
    
    
}
SendDataMonitor 如下:
package com.kk.ae.sdk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.ProtocolException;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 发送url数据的监控者,用于启动一个单独的线程来发送数据
 * 
 * @author gerry
 *
 */
public class SendDataMonitor {
    //收集日志
    public static final Logger log=Logger.getGlobal();
    // 队列,用户存储发送url
    public static final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
    //用于单例的一个类对象
    private static SendDataMonitor monitor=null;
    
    private SendDataMonitor() {
        // 私有构造方法,进行单列模式的创建
    }
    
    
    public static SendDataMonitor getMonitor() {
        if (monitor==null) {
            synchronized (SendDataMonitor.class) {
                if (monitor==null) {
                    monitor=new SendDataMonitor();
                    Thread thread=new Thread(new Runnable() {
                        
                        @Override
                        public void run() {
                        // TODO Auto-generated method stub
                        SendDataMonitor.monitor.run();    
                        
                        }
                    });
                    thread.start();
                }    
            }
        } 
        return monitor;
    }


    protected void run() {
        while (true) {
            try {
                String url=this.queue.take();
                // 正式的发送url
                HttpRequestUtil.sendData(url);
            } catch (Throwable e) {
                log.log(Level.WARNING, "发送url异常", e);
            }    
        }
    }


    public static void setMonitor(SendDataMonitor monitor) {
        SendDataMonitor.monitor = monitor;
        
    }


    /**
     * 添加一个url到队列中去
     * 
     * @param url
     * @throws InterruptedException
     */
    public static void addSendUrl(String url) throws InterruptedException {
         getMonitor().queue.put(url);
    
    }
    /**
     * 内部类,用户发送数据的http工具类
     * 
     * @author gerry
     *
     */
    public static class HttpRequestUtil{
        /**
         * 具体发送url的方法
         * 
         * @param url
         * @throws IOException
         */
        public static void sendData(String url) throws IOException {
            HttpURLConnection con=null;
            BufferedReader bf=null;
            try {
                URL obj=new URL(url);
                con=(HttpURLConnection) obj.openConnection();
                // 设置连接参数
                con.setConnectTimeout(5000);//连接过期时间
                con.setReadTimeout(5000);//读取数据过期时间
                con.setRequestMethod("GET");//设置请求类型为get
                System.out.println("发送url:" + url);
                // 发送连接请求
                bf=new BufferedReader(new InputStreamReader(con.getInputStream()));
                
            } finally {
                try {
                    if (bf!=null) {
                        bf.close();
                        
                    }
                } catch (Throwable e) {
                    // TODO: handle exception
                    
                }
                try {
                    con.disconnect();
                } catch (Throwable e) {
                    // TODO: handle exception
                }
            }
        }
    
    }

}

测试类:

package com.kk.ae.sdk;

public class Test {
    
public static void main(String[] args) {
    try {
        AnalyticsEngineSDK.chargeSuccess("order3516", "0958");
        AnalyticsEngineSDK.chargeRefund("kk3", "9009");
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

免责声明:文章转载自《大数据离线分析平台 JavaSDK数据收集引擎编写》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇js回调与异步加载的用法电子邮件的正则表达式下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

WebService的两种用户验证方式

年关将至,整理一下ME写的资料,发现有一篇未发表的,特共享出来,顺祝大家新年快乐了~~~ 1,使用SoapHeader传递和验证用户  Web Service端的代码:  1.1先创建一个继承自System.Web.Services.Protocols.SoapHeader     CredentialSoapHeader类:    public clas...

二十、异常捕获及处理详解

代码中被[]包含的表示可选,|符号分开的表示可选其一。 需求背景 我们在写存储过程的时候,可能会出现下列一些情况: 插入的数据违反唯一约束,导致插入失败 插入或者更新数据超过字段最大长度,导致操作失败 update影响行数和期望结果不一致 遇到上面各种异常情况的时,可能需要我们能够捕获,然后可能需要回滚当前事务。 本文主要围绕异常处理这块做详细的介绍。...

Oracle查询优化改写--------------------操作多个表

一、union all与空字符串 二、组合相关行 三、in 、exists、inter join 、left join 、right join 、full join 之间的区别   ’inner  join 返回两表相符合的数据    left    join  以左表为主表,左表返回所有的数据,在右表中只返回与左表匹配的数据   right   join...

NIO:Buffer 详解

如你所见,在NIO中,数据的读写操作始终是与缓冲区相关联的。Channel将数据读入缓冲区,然后我们又从缓冲区访问数据。写数据时,首先将要发送的数据按顺序填入缓冲区。基本上,缓冲区只是一个列表,它的所有元素都是基本数据类型(通常为字节型)。缓冲区是定长的,它不像一些类那样可以扩展容量(例如,List,StringBuffer等)。注意,ByteBuffe...

Azure Data Factory(一)入门简介

一,引言   今天分享一个新的Azure 服务-----Azure Data Factory(Azure 数据工厂),怎么理解,参考根据官方解释-----数据工厂解释:大数据需要可以启用协调和操作过程以将这些巨大的原始数据存储优化为可操作的业务见解的服务。 Azure 数据工厂是为这些复杂的混合提取-转换-加载 (ETL)、提取-加载-转换 (ELT) 和...

Gluster的搭建和使用

Gluster的搭建和使用 序言我们为什么要去使用分布式存储,在一家大型公司或者大规模的集群中,大家可能会经常遇到一个问题,我的数据怎么存放,放在那,数据空间不够了怎么办,这些问题经常困扰着我们。 笔者是在电信的一个部门工作的,我们的环境比较复杂。环境有NAS,各种NFS,还有为了高可用搭建的HA上面跑的共享目录,每次我们遇到的一个最大的问题就是,哪哪哪的...