转载

rabbitmq 和 kafka 简单的性能测试

测试环境:ubuntu 15.10 64位

cpu:inter core i7-4790 3.60GHZ * 8

内存:16GB

硬盘:ssd 120GB

软件环境:rabbmitmq 3.6.0   kafka0.8.1  (均为单机本机运行)

测试结果:

kafka :消费速度: 37,586 /s  生产速度: 448,753 /s

rabbitmq: 消费速度: 20,807 /s  生产速度  16.413 /s

出现问题:

rabbitmq 生产4分钟左右出现队列阻塞,无法继续添加数据,1分钟后恢复,再过大约1分钟又出现此现象并以约1分钟为间隔出现此问题。

rabbitmq 生产对象时有不小的几率(约 1/20)添加队列失败,报出的错误是“tcp链接重置”

其他并无任何问题

结论:

很明显的看出kafka的性能远超rabbitmq。不过这也是理所当然的,毕竟2个消息队列实现的协议是不一样的,处理消息的场景也大有不同。rabbitmq适合处理一些数据严谨的消息,比如说支付消息,社交消息等不能丢失的数据。kafka是批量操作切不报证数据是否能完整的到达消费者端,所以适合一些大量的营销消息的场景。

代码:

kafka:

 package main import (     "github.com/Shopify/sarama"     "os"     "os/signal"     "sync"     "log"     "time" )   func main() {     go producer() //    go consumer()     time.Sleep(10*time.Minute) }  func producer()  {     config :=sarama.NewConfig()     config.Producer.Return.Successes = true     proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)     if err != nil {         panic(err)     }      signals :=make(chan  os.Signal,1)     signal.Notify(signals,os.Interrupt)      var (         wg                          sync.WaitGroup         enqueued, successes, errors int     )      wg.Add(1)     go func() {         defer  wg.Done()         for _=range proder.Successes(){             successes++         }     }()     wg.Add(1)     go func() {         defer wg.Done()         for err := range proder.Errors(){             log.Println(err)             errors++         }     }()      go func() {         t1 := time.NewTicker(time.Second)         for{             <- t1.C             log.Println(enqueued)         }     }()      ProducerLoop:      for{         message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}         select {         case proder.Input() <- message:             enqueued++          case <- signals:             proder.AsyncClose()             break ProducerLoop         }      }      wg.Wait()     log.Println("Successfully produced:%d;errors:%d/n",successes,errors)  }  func consumer()  {     coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)     if err != nil {         panic(err)     }      defer func() {         if err :=coner.Close(); err !=nil{             log.Fatalln(err)         }     }()      partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)     if err != nil {         panic(err)     }      defer func() {         if err := partitionConsumer.Close();err!=nil{             log.Fatalln(err)         }     }()         signals := make(chan os.Signal,1)     signal.Notify(signals,os.Interrupt)     consumed:=0      go func() {         t1 := time.NewTicker(time.Second)         for{             <- t1.C             log.Println(consumed)         }     }()      ConsumerLoop:     for{         select {         case _ = <-partitionConsumer.Messages():              consumed++ //            log.Println( string(msg.Value),"  =>  ",consumed)         case <-signals:             break ConsumerLoop         }     }      log.Printf("Consumed: %d/n", consumed) } 

rabbitmq:

 package main  import (     "github.com/streadway/amqp"     "time"     "fmt"     "log" )  const (     queueName = "push.msg.q"     exchange  = "t.msg.ex"     mqurl ="amqp://shimeng:shimeng1015@192.168.155.106:5672/push"  )  var conn *amqp.Connection var channel *amqp.Channel  func main() {     fmt.Println(1) //    push()     receive() //    fmt.Println("end") //    close() }  func failOnErr(err error, msg string) {     if err != nil {         log.Fatalf("%s:%s", msg, err)         panic(fmt.Sprintf("%s:%s", msg, err))     } }  func mqConnect() {     var err error     conn, err = amqp.Dial(mqurl)     if err != nil {         log.Println(1)         log.Fatalln(err)     }     fmt.Println(5)     channel, err = conn.Channel()     if err != nil {         fmt.Println(2)         log.Fatalln(err)     }else {         fmt.Println("a")     } }  func push() {     count := 0     if channel == nil {         fmt.Println(2)         mqConnect()     }else {         fmt.Println(3)     }     msgContent := "hello world!"     t1 := time.NewTicker(time.Second)      go func() {         for{             <- t1.C             log.Println(count)         }     }()      for{         err := channel.Publish(exchange, "test", false, false, amqp.Publishing{             ContentType: "text/plain",             Body:        []byte(msgContent),         })         if err != nil {          }else {             count ++         }      }  }  func receive() {     if channel == nil {         mqConnect()     }     count :=0     msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)     failOnErr(err, "")      forever := make(chan bool)      t1 := time.NewTicker(time.Second)     go func() {         for{             <- t1.C             log.Println(count)         }     }()     go func() {         //fmt.Println(*msgs)         for _= range msgs {             count ++ //            s := BytesToString(&(d.Body)) //            count++ //            fmt.Printf("receve msg is :%s -- %d/n", *s, count)         }     }()      fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C/n")     <-forever } 
原文  http://www.cnblogs.com/shi-meng/p/5190980.html
正文到此结束
Loading...