测试环境:ubuntu 15.10 64位
cpu:inter core i7-4790 3.60GHZ * 8
内存:16GB
硬盘:ssd 120GB
软件环境:rabbmitmq 3.6.0 kafka0.8.1 (均为单机本机运行)
PS:测试结果均为单操作测试,即生产的时候没有消费操作
测试结果:
kafka :消费速度: 37,586 /s 生产速度: 448,753 /s
rabbitmq: 消费速度: 20,807 /s 生产速度 16.413 /s
出现问题:
rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。
rabbitmq 生产对象时有不小的几率(约1/20)添加队列失败,报出的错误是“tcp链接重置”
其他并无任何问题
结论:
很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。
代码:
kafka:
package main import ( "github.com/Shopify/sarama" "os" "os/signal" "sync" "log" "time" ) func main() { go producer() //go consumer() time.Sleep(10*time.Minute) } func producer() { config :=sarama.NewConfig() config.Producer.Return.Successes = true proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config) if err !=nil { panic(err) } signals :=make(chan os.Signal,1) signal.Notify(signals,os.Interrupt) var( wg sync.WaitGroup enqueued, successes, errors int ) wg.Add(1) go func() { defer wg.Done() for _=range proder.Successes(){ successes++ } }() wg.Add(1) go func() { defer wg.Done() for err :=range proder.Errors(){ log.Println(err) errors++ } }() go func() { t1 :=time.NewTicker(time.Second) for{ <-t1.C log.Println(enqueued) } }() ProducerLoop: for{ message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")} select{ case proder.Input() <-message: enqueued++ case <-signals: proder.AsyncClose() breakProducerLoop } } wg.Wait() log.Println("Successfully produced:%d;errors:%d ",successes,errors) } func consumer() { coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil) if err !=nil { panic(err) } defer func() { if err :=coner.Close(); err !=nil{ log.Fatalln(err) } }() partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest) if err !=nil { panic(err) } defer func() { if err := partitionConsumer.Close();err!=nil{ log.Fatalln(err) } }() signals := make(chan os.Signal,1) signal.Notify(signals,os.Interrupt) consumed:=0 go func() { t1 :=time.NewTicker(time.Second) for{ <-t1.C log.Println(consumed) } }() ConsumerLoop: for{ select{ case _ = <-partitionConsumer.Messages(): consumed++ //log.Println( string(msg.Value)," => ",consumed) case <-signals: breakConsumerLoop } } log.Printf("Consumed: %d ", consumed) }
rabbitmq:
package main import ( "github.com/streadway/amqp" "time" "fmt" "log" ) const( queueName = "push.msg.q" exchange = "t.msg.ex" mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push" ) var conn *amqp.Connection var channel *amqp.Channel func main() { fmt.Println(1) //push() receive() //fmt.Println("end") //close() } func failOnErr(err error, msg string) { if err !=nil { log.Fatalf("%s:%s", msg, err) panic(fmt.Sprintf("%s:%s", msg, err)) } } func mqConnect() { varerr error conn, err =amqp.Dial(mqurl) if err !=nil { log.Println(1) log.Fatalln(err) } fmt.Println(5) channel, err =conn.Channel() if err !=nil { fmt.Println(2) log.Fatalln(err) }else{ fmt.Println("a") } } func push() { count := 0 if channel ==nil { fmt.Println(2) mqConnect() }else{ fmt.Println(3) } msgContent := "hello world!" t1 :=time.NewTicker(time.Second) go func() { for{ <-t1.C log.Println(count) } }() for{ err := channel.Publish(exchange, "test", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgContent), }) if err !=nil { }else{ count ++ } } } func receive() { if channel ==nil { mqConnect() } count :=0 msgs, err := channel.Consume(queueName, "", true, false, false, false, nil) failOnErr(err, "") forever := make(chan bool) t1 :=time.NewTicker(time.Second) go func() { for{ <-t1.C log.Println(count) } }() go func() { //fmt.Println(*msgs) for _=range msgs { count ++ //s := BytesToString(&(d.Body)) //count++ //fmt.Printf("receve msg is :%s -- %d ", *s, count) } }() fmt.Printf("[*] Waiting for messages. To exit press CTRL+C ") <-forever }