基于Filebeat+Kafka+Flink仿天猫双11实时交易额

摘要:
id=1588506573420812062&wfr=spider&for=pc)2.仿天猫双11实时交易额技术架构利用Linuxshell自动化模拟每秒钟产生一条交易额数据,数据内容为用户id,购买商品的付款金额,用户所在城市及所购买的商品技术架构上利用Filebeat去监控每生产的一条交易额记录,Filebeat将交易额输出到Kafka,然后编写Flink客户端程序去实时消费Kafka数据,对数据进行两块计算,一块是统计实时总交易额,一块是统计不同城市的实时交易额技术架构图3.具体实现3.1.模拟交易额数据double11.sh脚本#!

1. 写在前面

在大数据实时计算方向,天猫双11的实时交易额是最具权威性的,当然技术架构也是相当复杂的,不是本篇博客的简单实现,因为天猫双11的数据是多维度多系统,实时粒度更微小的。当然在技术的总体架构上是相近的,主要的组件都是用到大数据实时计算组件Flink(当然阿里是用了基于Flink深度定制和优化改装的Blink)。下图是天猫双11实时交易额的大体架构模型及数据流向(参照https://baijiahao.baidu.com/s?id=1588506573420812062&wfr=spider&for=pc)
基于Filebeat+Kafka+Flink仿天猫双11实时交易额第1张

2. 仿天猫双11实时交易额技术架构

利用Linux shell自动化模拟每秒钟产生一条交易额数据,数据内容为用户id,购买商品的付款金额,用户所在城市及所购买的商品
基于Filebeat+Kafka+Flink仿天猫双11实时交易额第2张

技术架构上利用Filebeat去监控每生产的一条交易额记录,Filebeat将交易额输出到Kafka(关于Filebeat和kafka的安装或应用请参照之前的博客),然后编写Flink客户端程序去实时消费Kafka数据,对数据进行两块计算,一块是统计实时总交易额,一块是统计不同城市的实时交易额
技术架构图
基于Filebeat+Kafka+Flink仿天猫双11实时交易额第3张

3.具体实现

3.1. 模拟交易额数据double11.sh脚本

#!/bin/bash 
i=1
for i in $(seq 1 60)
	do
        customernum=`openssl rand -base64 8 | cksum | cut -c1-8`
        pricenum=`openssl rand -base64 8 | cksum | cut -c1-4`
        citynum=`openssl rand -base64 8 | cksum | cut -c1-2`
        itemnum=`openssl rand -base64 8 | cksum | cut -c1-6`
        echo "customer"$customernum","$pricenum",""city"$citynum",""item"$itemnum >> /home/hadoop/tools/double11/double11.log
        sleep 1
    done

将double11.sh放入Linux crontab

#每分钟执行一次
* * * * * sh /home/hadoop/tools/double11/double11.sh

3.2. 实时监控double11.log

Filebeat实时监控double11.log产生的每条交易额记录,将记录实时流向到Kafka的topic,这里只需要对Filebeat的beat-kafka.yml做简单配置,kafka只需要启动就好
基于Filebeat+Kafka+Flink仿天猫双11实时交易额第4张

3.3. 核心:编写Flink客户端程序

这里将统计实时总交易额和不同城市的实时交易额区分写成两个类(只提供Flink Java API)
需要导入的maven依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.0.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    </dependencies>

统计实时总交易额代码

package com.fastweb;

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class Double11Sum {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.184.12:9092");
        properties.setProperty("zookeeper.connect", "192.168.184.12:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
        counts.print();

        env.execute("Double 11 Real Time Transaction Volume");
    }

    //统计总的实时交易额
    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            JSONObject object = JSONObject.parseObject(value);
            String message = object.getString("message");
            Integer price = Integer.parseInt(message.split(",")[1]);
            out.collect(new Tuple2<String, Integer>("price", price));
        }
    }
}

统计不同城市的实时交易额

package com.fastweb;

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class Double11SumByCity {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.184.12:9092");
        properties.setProperty("zookeeper.connect", "192.168.184.12:2181");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        DataStream<Tuple2<String, Integer>> cityCounts = stream.flatMap(new CitySplitter()).keyBy(0).sum(1);
        cityCounts.print();

        env.execute("Double 11 Real Time Transaction Volume");
    }

    //按城市分类汇总实时交易额
    public static final class CitySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            JSONObject object = JSONObject.parseObject(value);
            String message = object.getString("message");
            Integer price = Integer.parseInt(message.split(",")[1]);
            String city = message.split(",")[2];
            out.collect(new Tuple2<String, Integer>(city, price));
        }
    }
}

代码解释:这里可以方向两个类里面只有flatMap的对数据处理的内部类不同,但两个内部类的结构基本相同,在内部类里面利用fastjson解析了一层获取要得到的数据,这是因为经过Filebeat监控的数据是json格式的,Filebeat这样实现是为了在正式的系统上确保每条数据的来源IP,时间戳等信息

3.4. 验证

启动Double11Sum类的main方法就可以得到实时的总交易额,按城市分类的实时交易额也一样,这个结果是实时更新的,每条记录都是新的
基于Filebeat+Kafka+Flink仿天猫双11实时交易额第5张

免责声明:文章转载自《基于Filebeat+Kafka+Flink仿天猫双11实时交易额》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Android Studio 快捷键整理一个网站应该具备的功能!下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

携程结合用户画像

用户画像作为“大数据”的核心组成部分,在众多互联网公司中一直有其独特的地位。作为国内旅游OTA的领头羊,携程也有着完善的用户画像平台体系。目前用户画像广泛用于个性化推荐,猜你喜欢等;针对旅游市场,携程更将其应用于“房型排序”“机票排序”“客服投诉”等诸多特色领域。 本文将从目的,架构、组成等几方面,带你了解携程在该领域的实践。 1.携程为什么做用户画像...

打包framework 涉及到得架构问题

一、在项目开发过程中 ,为了适配不通的设备 ,需要我们手动的增加支持设备的架构。那么就需要我们对苹果手机对应的架构所有了解 现在列出目前需要适配的集中机型对应的架构 上图中还少一个基于模拟器的x86_64位得架构 我们在打包framework时,不可避免的要对架构进行设置。如下图: 现对上诉字段进行说明:一下文字引用自网页:http://www.tu...

前后端分离实践(一)

前言 最近这一段时间由于Nodejs的逐渐成熟和日趋稳定,越来越多的公司中的前端团队开始尝试使用Nodejs来练一下手,尝一尝鲜。 一般的做法都是将原本属于后端的一部分相对于业务不是很重要的功能迁移到Nodejs上面来,也有一些公司将NodeJS作为前后端分离的一个解决方案去施行。而像淘宝网这类的大型网站也很早的完成了前后端的分离,给我们这样的后来者提供了...

十六、源码部署EFK之快乐没有了

一、事情起因 我看的老男孩76期ELK课程的day106缺了第12集,这一集讲的是安装Filebeat呈现Nginx日志的过程,于是快乐没有了。 因为课程缺失,我看了很多关于Filebeat的博文,但由于版本差异,配置文件也有些许差异;同时还有架构上的不同,导致各式各样的安装方式;为了少走弯路同时更加深入的了解ELK的部署以及运行,我花了两天时间看完了千锋...

Hazelcast介绍

Hazelcast介绍 什么时侯需要用例 内存中分布式计算 场景分布式消息 特性 全景 Distributed Maps 一个结点中分区 Hazelcast中的分片也称为分区,Hazelcast默认271个分区。Hazlecast通常也会对分区备份,并将副本分布到集群的不同节点上,通过数据冗余提高可靠性,这种数据的存储方式和kafka...

电商之下:服务类商品订单履约系统如何设计

电商之下,我们几乎能从电商平台上买到任何我们日常需要的商品,但是对于很多商品来说,用户购买发货后,只是整个交易流程开始的第一步,后续商家提供的上门服务才是整个交易过程中用户对商品和品牌感知力最强的时候,如何抓住这最后一公里实现用户体验和商品品牌效益最大化,是各个品牌商打造占领品牌核心竞争力和占领口碑的战略高地时不得不考虑的关键。本文将与你探讨以locati...