Skip to content

Latest commit

 

History

History
206 lines (155 loc) · 11.6 KB

chapter1.md

File metadata and controls

206 lines (155 loc) · 11.6 KB

Кириш

RabbitMQ бу - хабарлар воситачиси. Маъноси шуки, у хабар тайёрловчилардан хабарни олади ва уларни истеъмолчиларга етказади. Ўртада сиз берган қоидага асосан у хабарларни маршрутлаш, буфферлаш ва қатъий талаб этиш ишларини олиб боради.

RabbitMQ ва хабарлашиш умумий ҳолда қуйидаги терминларни ишлатади.

• Producing (жўнатиш) – бу жўнатишдан бошқа ҳеч нарса эмас. Хабарларни жўнатувчи дастур бу - producer(яратувчи, тайёрловчи). Биз уни кейинчалик қуйидаги кўринишда чизамиз "P":

Queue(навбат) - бу почта қутиси номи. У RabbitMQ ичида яшайди. Хабарлар RabbitMQ ва сизнинг дастурларингиз орқали ўтсада улар фақат queue (навбат) ичида сақланади. Queue ҳеч қандай чегараланишларга эга эмас. У хабарларни сақлашда сиз хоҳлаган катталикдаги чексиз буфферга ўхшайди. Кўп producer лар битта навбатдан чиқувчи хабарларни жўнатишлари ва мана шу битта навбатдан жуда кўп consumer(истеъмолчи) лар хабарларни қабул қилиб олишга уриниб кўришлари мумкин. Queue ни қуйидаги кўринишда чизамиз ва устида унинг номини ҳам келтирамиз:

Consuming (истеъмол қилиш) - бу маъно жиҳатдан қабул қилиш билан ўхшашликка эга. Consumer бу программа бўлиб, одатда ушбу программалар хабарни олиш учун кутишади. Бизнинг чизмамизда у "C" каби тасвирланган:

Шунга эътибор берингки producer, consumer ва broker лар битта мошинада бўлмасликлари керак. Ҳақиқатда эса кўп дастурлар бундай қилишмайди.

"Hello World"

(Go RabbitMQ клиентини қўллаган ҳолда)

Қўлланманинг ушбу қисмида биз Go da қуйидаги ишларни бажарувчи иккита кичик дастур ёзамиз:

  1. producer – битта хабарни жўнатувчи
  2. consumer – хабарларни қабул қилиб ва уларни чоп этувчи.

Биз бошланишига Go RabbitMQ API нинг айрим деталларига таяниб жуда содда иш қиламиз. Бу иш "Hello World" хабарини жўнатиш.

Қуйидаги диаграммада "P" – бизнинг producer ва "C" - бизнинг consumer. Ўртадаги қути – бу навбат, яъни RabbitMQ нинг consumer учун хабарларни сақлайдиган хабарлар буффери.

Go нинг RabbitMQ клиент кутубхонаси

RabbitMQ хабарлашиш учун очиқ бўлмиш умумий-мақсаддаги протокол AMQP 0.9.1 ни ишлатади. RabbitMQ учун кўпгина бошқа тиллардаги бир қанча клиентлар мавжуд. Биз ушбу қўлланмада Go amqp клиентни қўллаймиз.

Биринчи навбатда go get дан фойдаланиб amqp ни ўрнатинг:

$ go get github.com/streadway/amqp

Хозир биз amqp ни ўрнатдик. Энди сиз код ёзишингиз мумкин.

Жўнатиш

Бизнинг хабар жўнатувчимизни send.go ва хабар қабул қилувчимизни receive.go деб номлаймиз. Жўнатувчи RabbitMQ га боғланиб, битта хабар жўнатиб, сўнгра чиқиб кетади.

send.go да биз аввал кутубхонани import қилишимиз зарур:

package main

import (
  "fmt"
  "log"

  "github.com/streadway/amqp"
)

Биз яна ҳар сафар amqp ни чақирганимизда қайтариладиган қийматни текширадиган ёрдамчи функцияга муҳтожмиз:

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

Сўнгра RabbitMQ серверга боғланамиз:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

Боғланиш сокет боғланишни абстрактлаштиради, музокаралар протоколи версияси устида қайғуради, аутентификация билан шуғулланади ва шу каби ишларни биз учун бажаради. Кейин биз асосан бажариладиган ишларни ўз ичига олувчи API яшайдиган канал яратамиз:

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

Жўнатмоқ учун биз queue ни жўнатишга эълон қилишимиз керак. Сўнгра биз queue га хабарни беришимиз мумкин:

q, err := ch.QueueDeclare(
  "hello", // ном
  false,   // давомий
  false,   // ишлатилмаётганда ўчириб ташлаш
  false,   // ўзи алоҳида
  false,   // кутмаслик
  nil,     // аргументлар
)
failOnError(err, "Failed to declare a queue")

body := "hello"
err = ch.Publish(
  "",     // алмашинув
  q.Name, // маршрутизациялаш калити
  false,  // мажбурий
  false,  // шошилинч
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),
  })
failOnError(err, "Failed to publish a message")

Queue ни эълон қилиш бир хил – лекин, у хозирда мавжуд бўлмаса яратилади. Хабар ташкил этувчилари байтли массив. Шу сабабли сиз у ерда нимани кодлашни хоҳлаган бўлсангиз ҳаммасини қилсангиз бўлади.

Бу ерда send.go тўлиқ коди келтирилган.

Жўнатиш ишламаябди!

Агар сиз RabbitMQ ни биринчи марта ишга туширган бўлсангиз ва сиз "Sent" хабарини кўрмаётган бўлсангиз у ҳолда сиз сабабини топа олмай эҳтимол ҳайратланаётгандирсиз. Балким broker етарли даражадаги бўш жой билан ишга тушмагандир (одатда у 1Гб бўш жойга муҳтож) ва шу сабабли хабарни қабул қилиш инкор қилингандир. Broker logfile файлини ишонч хосил қилиш учун текширинг ва агар зарур бўлса лимитни камайтиринг. Қандай қилиб disk_free_limit ни ўзгартириш ҳақида конфигурациялар файли маълумотномасида келтирилган.

Қабул қилиш

Бу бизнинг жўнатувчимиз учун. RabbitMQ дан жўнатилган хабарларни қабул қилувчимиз. У битта хабарни жўнатган sender га ўхшамайди. Биз уни хабарларни қабул қилиб олиб ва ташқарига чоп этиш мақсадида эшитиб турадиган тарзда ишга туширилган ҳолатда ушлаб турамиз.

У ҳам код жўнатувчи каби (receive.go) import ва ёрдамчи функцияларга эга:

package main

import (
  "fmt"
  "log"

  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

Созлаш sender дагидек. Биз боғланиш ва канални очамиз ва навбатни хабарни қайсидан қабул қилиш керак бўлса ана шунда эълон қиламиз. Эътибор беринг қабул қилаётгандаги навбат жўнатаётган навбат номи билан мос бўлиши керак.

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
  "hello", // ном
  false,   // давомийлик
  false,   // ишлатилмаётганда ўчириш
  false,   // ўзи алоҳида
  false,   // кутмаслик
  nil,     // аргументлар
)
failOnError(err, "Failed to declare a queue")

Эътибор беринг биз бу ерда тўлдирилган queue ни эълон қилдик. Чунки биз қабул қилишни жўнатишдан олдин бошлаган бўлишимиз мумкин эди. Биз хабарни қабул қилиб олишдан олдин уни мавжудлигига ишонч хосил қилишни хоҳладик.

Биз сервердан бизга навбатдан хабарни олиб беришни сўраябмиз. Шундай қилиб у бизга хабарларни асинхрон тарзда жўнатишни бошлайди. Биз эса goroutine да канал (amqp::Consume орқали қайтарилган) дан хабарларни ўқий бошлаймиз.

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

Бу ерда receive.go нинг тўлиқ коди.

Ҳаммасини биргаликда ишга тушириш

Хозир биз икки скриптни ишга туширишимиз мумкин. Командалар сатрида sender ни ишга туширинг:

$ go run send.go

Кейин receiver ни ишга туширинг:

$ go run receive.go

Қабул қилувчи RabbitMQ орқали жўнатувчидан олган хабарини чоп этиши керак. Қабул қилувчи ишга тушган ҳолатда қолиши керак (тўхтатиш учун Ctrl-C ни ишлатинг), шу сабабли бошқа командалар сатридан жўнатувчини яна ишга туширишга уриниб кўринг.

Агар сиз навбатларни текширишни хоҳласангиз rabbitmqctl list_queues ни қўллашга уриниб кўринг.

2 қисмга ўтиш ва онсон бўлган work queue ни қуришга вақти бўлди.