rabbitmq 和 kafka 简单的性能测试

摘要:
当Rabbitmq生成对象时,很有可能添加队列失败。报告的错误是“tcp链接重置”。没有其他问题。结论:Kafka的性能明显优于Rabbitmq。然而,这也是理所当然的事。毕竟,两个消息队列实现的协议是不同的,处理消息的场景也非常不同。Rabbitmq适用于处理一些数据严格的消息,例如支付消息、社交消息和其他不能丢失的数据。Kafka是一种批处理操作,无法验证数据是否能够完全到达消费者手中,因此它适用于具有大量营销消息的某些场景。

测试环境:ubuntu 15.10 64位

cpu:inter core i7-4790 3.60GHZ * 8

内存:16GB

硬盘:ssd 120GB

软件环境:rabbmitmq 3.6.0 kafka0.8.1 (均为单机本机运行)

PS:测试结果均为单操作测试,即生产的时候没有消费操作

测试结果:

kafka :消费速度: 37,586 /s 生产速度: 448,753 /s

rabbitmq: 消费速度: 20,807 /s 生产速度 16.413 /s

出现问题:

rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。

rabbitmq 生产对象时有不小的几率(约1/20)添加队列失败,报出的错误是“tcp链接重置”

其他并无任何问题

结论:

很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。

代码:

kafka:

package main
import (
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "sync"
    "log"
    "time"
)
func main() {
    go producer()
//go consumer()
    time.Sleep(10*time.Minute)
}
func producer()  {
    config :=sarama.NewConfig()
    config.Producer.Return.Successes = true
    proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
    if err !=nil {
        panic(err)
    }
    signals :=make(chan  os.Signal,1)
    signal.Notify(signals,os.Interrupt)
    var(
        wg                          sync.WaitGroup
        enqueued, successes, errors int
    )
    wg.Add(1)
    go func() {
        defer  wg.Done()
        for _=range proder.Successes(){
            successes++
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        for err :=range proder.Errors(){
            log.Println(err)
            errors++
        }
    }()
    go func() {
        t1 :=time.NewTicker(time.Second)
        for{
            <-t1.C
            log.Println(enqueued)
        }
    }()
    ProducerLoop:
    for{
        message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
        select{
        case proder.Input() <-message:
            enqueued++
        case <-signals:
            proder.AsyncClose()
            breakProducerLoop
        }
    }
    wg.Wait()
    log.Println("Successfully produced:%d;errors:%d
",successes,errors)
}
func consumer()  {
    coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
    if err !=nil {
        panic(err)
    }
    defer func() {
        if err :=coner.Close(); err !=nil{
            log.Fatalln(err)
        }
    }()
    partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
    if err !=nil {
        panic(err)
    }
    defer func() {
        if err := partitionConsumer.Close();err!=nil{
            log.Fatalln(err)
        }
    }()
    signals := make(chan os.Signal,1)
    signal.Notify(signals,os.Interrupt)
    consumed:=0
    go func() {
        t1 :=time.NewTicker(time.Second)
        for{
            <-t1.C
            log.Println(consumed)
        }
    }()
    ConsumerLoop:
    for{
        select{
        case _ = <-partitionConsumer.Messages():
            consumed++
//log.Println( string(msg.Value),"  =>  ",consumed)
        case <-signals:
            breakConsumerLoop
        }
    }
    log.Printf("Consumed: %d
", consumed)
}

rabbitmq:

package main
import (
    "github.com/streadway/amqp"
    "time"
    "fmt"
    "log"
)
const(
    queueName = "push.msg.q"
    exchange  = "t.msg.ex"
    mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push"
)
var conn *amqp.Connection
var channel *amqp.Channel
func main() {
    fmt.Println(1)
//push()
receive()
//fmt.Println("end")
//close()
}
func failOnErr(err error, msg string) {
    if err !=nil {
        log.Fatalf("%s:%s", msg, err)
        panic(fmt.Sprintf("%s:%s", msg, err))
    }
}
func mqConnect() {
    varerr error
    conn, err =amqp.Dial(mqurl)
    if err !=nil {
        log.Println(1)
        log.Fatalln(err)
    }
    fmt.Println(5)
    channel, err =conn.Channel()
    if err !=nil {
        fmt.Println(2)
        log.Fatalln(err)
    }else{
        fmt.Println("a")
    }
}
func push() {
    count := 0
    if channel ==nil {
        fmt.Println(2)
        mqConnect()
    }else{
        fmt.Println(3)
    }
    msgContent := "hello world!"
    t1 :=time.NewTicker(time.Second)
    go func() {
        for{
            <-t1.C
            log.Println(count)
        }
    }()
    for{
        err := channel.Publish(exchange, "test", false, false, amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(msgContent),
        })
        if err !=nil {
        }else{
            count ++
        }
    }
}
func receive() {
    if channel ==nil {
        mqConnect()
    }
    count :=0
    msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
    failOnErr(err, "")
    forever := make(chan bool)
    t1 :=time.NewTicker(time.Second)
    go func() {
        for{
            <-t1.C
            log.Println(count)
        }
    }()
    go func() {
        //fmt.Println(*msgs)
        for _=range msgs {
            count ++
//s := BytesToString(&(d.Body))
//count++
//fmt.Printf("receve msg is :%s -- %d
", *s, count)
}
    }()
    fmt.Printf("[*] Waiting for messages. To exit press CTRL+C
")
    <-forever
}

免责声明:文章转载自《rabbitmq 和 kafka 简单的性能测试》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Linux系统休眠和设备中断处理RHEL6.6安装Oracle 11g RAC下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

华为/H3C Syslog配置

H3C交换机的设置举例1. 组网需求将系统的日志信息发送到 linux 日志主机;日志主机的IP 地址为1.2.0.1/16;信息级别高于等于 informational 的日志信息将会发送到日志主机上;日志信息的输出语言为英文,允许输出信息的模块为ARP 和 CMD。2. 组网图3. 配置步骤(1) 设备上的配置。# 开启信息中心。<Sysname...

Kafka(四) —— KafkaProducer源码阅读

一、doSend()方法 Kafka中的每一条消息都对应一个ProducerRecord对象。 public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final...

Java中的日志——Java.util.logging、log4j、commons-logging

Java中给项目程序添加log主要有三种方式,一使用JDK中的java.util.logging包,一种是log4j,一种是commons-logging。其中log4j和commons-logging都是apache软件基金会的开源项目。这三种方式的区别如下: Java.util.logging,JDK标准库中的类,是JDK 1.4 版本之后添加的日志记...

rabbitMQ Connection timed out

在VM中部署了一个rabbitMQ server ,在物理机上按照rabbitMQ官网上的 java的教程访问VM中的rabbitMQ报如下错误: Exception in thread "main" java.net.ConnectException: Connection timed out: connect at java.net.DualStac...

Rabbit MQ安装配置及常见问题

Window安装 1:RabbitMQ安装 1.1:安装Erlang:http://www.erlang.org/ 1.2:安装RabbitMQ:http://www.rabbitmq.com/download.html 1.3:window 平台配置参考:https://github.com/ServiceStack/rabbitmq-windows 1...

MySQL 8.0.14版本新功能详解

点击▲关注 “数据和云”   给公众号标星置顶 更多精彩 第一时间直达 作者:崔虎龙,云和恩墨-开源架构部-MySQL技术顾问,长期服务于数据中心(金融,游戏,物流)行业,熟悉数据中心运营管理的流程及规范,自动化运维 等方面。擅长MySQL,Redis,MongoDB 数据库高可用设计 和 运维故障处理,备份恢复,升级迁移,性能优化 。 MySQL已进...