Pub/Sub Intro

<< XINFO HyperLogLog Introduction >>

소개

메시지 보내기, 받기

일반적인 데이터베이스와는 다르게 레디스는 메시지를 주고, 받는 기능을 제공합니다.   Publish 명령으로 보내고, Subscribe 명령으로 받습니다.
통로는 채널(channel)을 이용합니다.   채널은 "SET KEY VALUE"에서 사용하는 'KEY'와 같은 것으로 생각하면 됩니다.
방법은 클라이언트1에서 subscribe channel_name를 실행하고, 클라이언트2에서 publish channel_name "Message"를 실행하면, 클라이언트1에 "Message"가 나옵니다.
레디스의 Pub/Sub 시스템은 메시지를 보관(queuing) 하지 않습니다.   Publish 하는 시점에 이미 실행한 subscribe 명령으로 대기하고 있는 클라이언트들에게만 전달됩니다.

이제 명령을 하나씩 살펴보도록 하겠습니다.

명령 설명

SUBSCRIBE channel [channel ...]

지정한 채널로 보내진 메시지를 받습니다.   채널을 여러 개 지정할 수 있습니다.

    6379> subscribe ch01 ch02
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "ch01"
    3) (integer) 1
    1) "subscribe"
    2) "ch02"
    3) (integer) 2

다른 클라이언트에서 PUBLISH ch01 "Hello!" 명령으로 메시지를 보내면, 다음과 같이 받는다.

    1) "message"
    2) "ch01"
    3) "Hello!"

PSUBSCRIBE pattern [pattern ...]

채널을 패턴으로 등록한다. 패턴을 여러 개 등록할 수 있다.
패턴은 아래와 같은 glob-style을 지원한다.

    • '?'는 한 글자를 대치한다.   h?llo는 hello, hallo, hxllo 같은 것을 의미한다.
    • '*'은 공백이나 여러 글자를 대치한다.   h*llo는 hllo, heeeello 같은 것을 의미한다.
    • h[ae]llo는 'a' 나 'e'만 올 수 있다.   그래서 hello, hallo는 되고, hillo는 안된다.

패턴 등록 명령 예

    6379> psubscribe ch*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "ch*"
    3) (integer) 1

다른 클라이언트에서 PUBLISH ch01 "Hello! AlphaGo" 명령으로 메시지를 보내면, 다음과 같이 받는다.

    1) "pmessage"
    2) "ch*"
    3) "ch01"
    4) "Hello! AlphaGo"

UNSUBSCRIBE [channel [channel ...]]

SUBSCRIBE로 등록한 채널을 삭제해서, 더 이상 메시지를 받지 않습니다.
채널명을 입력하지 않으면 해당 클라이언트에 등록된 모든 채널을 삭제합니다.
입력하면 입력한 채널명만 삭제합니다.
redis-cli에서는 Ctrl-C로 종료합니다.

    6379> unsubscribe ch*
    1) "unsubscribe"
    2) "ch*"
    3) (integer) 1

PUNSUBSCRIBE [pattern [pattern ...]]

PSUBSCRIBE로 등록한 패턴을 삭제해서, 더 이상 메시지를 받지 않습니다.
패턴명을 입력하지 않으면 해당 클라이언트에 등록된 모든 패턴을 삭제합니다.
입력하면 입력한 패턴명만 삭제합니다.
redis-cli에서는 Ctrl-C로 종료합니다.

    6379> punsubscribe ch*
    1) "punsubscribe"
    2) "ch*"
    3) (integer) 1

PUBLISH channel message

메시지를 지정한 채널로 보냅니다.
메시지를 받는 클라이언트 수를 리턴합니다.

    6379> publish ch01 "Hello! AlphaGo"
    (integer) 1

PUBSUB subcommand [argument [argument ...]]

PUBSUB 명령은 서버에 등록된 채널이나 패턴을 조회한다.
세 가지 subcommand가 있다.   channels, numsub, numpat입니다.
channels, numsub은 채널 관련 명령이고, numpat은 패턴 관련 명령이다.

PUBSUB channels [pattern]

Pattern을 입력하지 않으면 해당 서버에 등록된 모든 채널명을 보여준다.
여기서 pattern은 psubscribe로 등록한 pattern이 아니고, 채널명을 glob-style로 찾는 것이다.
등록한 pattern을 보여주는 것은 아니다.
클라이언트에서 같은 이름의 채널을 여러 번 등록해도 이 명령에서는 한 번만 보여준다.

    6379> pubsub channels
    1) "ch01"
    2) "ch02"

PUBSUB numsub [channel-1 ... channel-n]

해당 채널에 등록된 클라이언트 개수를 보여준다.
PUBSUB channels로 확인한 채널 각각에 몇 개의 클라이언트가 연결되어 있는지 보여준다.

    6379> pubsub numsub ch01 ch02
    1) "ch01"
    2) (integer) 3
    3) "ch02"
    4) (integer) 1

PUBSUB numpat

서버에 등록된 pattern의 개수를 보여준다.

    6379> pubsub numpat
    (integer) 3

Pub/Sub와 클러스터

클러스터에서 Publish하면 클론을 포함한 모든 노드에게 보낸다. 따라서 마스터에서 publish한 메시지를 클론(슬레이브)에서 subscribe할 수 있다.  마스터 뿐만 아니라 클론에서도 publish할 수 있다. 클론에서 publish한 메시지를 다른 클론에서 subscribe할 수 있다.  Pub/Sub를 중요하게 사용한다면 클러스터를 사용하세요.

내부 구조 Internals

데이터 구조 Data Structures

Pub/Sub 시스템은 레디스 서버와 클라이언트에 채널과 패턴을 등록한다.
우선 간단히 설명하면 채널은 dict(Hash table)에 저장되고 패턴은 링크드 리스트에 저장된다.

먼저 채널을 저장하는 dict(Hash table)을 살펴보자.

채널을 저장하는 서버 Dict(Hash table) 데이터 구조

redis pubsub server channels data structure
    그림 1-1   Redis Server Dict(Hash Table) Data structure for channels

서버 구조체(redisServer struct)에는 pubsub_channels 필드와 pubsub_patterns 필드가 있다.   Pubsub_channels 필드는 채널을 저장하는 dict 구조체를 가리킨다.   Dict 구조체부터 dictEntry까지는 레디스에서 사용하는 해시 테이블이다.
DictEntry의 key 필드가 channel을 가리킨다.   한 channel을 여러 클라이언트가 subscribe 할 수 있으므로 dictEntry의 value 필드는 리스트(linked list)를 가리킨다.   리스트는 여러 개의 listNode를 가지고 각 노드는 클라이언트를 가리킨다.
PUBLISH channel message 명령은 먼저 channel을 Hash table에서 channel을 찾고, 리스트에 저장되어 있는 클라이언트들에게 하나씩 메시지를 보낸다.   그다음 패턴을 등록한 클라이언트들에게 메시지를 보낸다.
패턴을 저장하는 링크드 리스트 구조는 다음과 같다.


패턴을 저장하는 서버 링크드 리스트 데이터 구조

redis pubsub server patterns data structure
    그림 1-2   Redis Server Linked List Data structure for patterns

서버 구조체의 pubsub_patterns 필드는 패턴을 저장하는 리스트를 가리킨다.   리스트의 각 노드는 pubsubPattern 구조체를 가리키고, 이 구조체는 클라이언트와 패턴을 가진다.
PUBLISH channel message 명령은 먼저 channel 명으로 클라이언트에 메시지를 보낸 다음, channel에 해당하는 패턴을 찾아 해당 클라이언트에 메시지를 보낸다.


채널을 저장하는 클라이언트 Dict(Hash table) 데이터 구조

redis pubsub client channels data structure
    그림 1-3   Redis Client Dict(Hash Table) Data structure for channels

클라이언트 구조체(redisClient struct)에도 서버 구조체와 같이 pubsub_channels 필드와 pubsub_patterns 필드가 있다.   클라이언트 구조체의 pubsub_channels 필드도 채널을 저장하는 dict 구조체를 가리킨다.
Dict 구조체부터 dictEntry까지는 서버 구조체일 때와 같고 dictEntry의 key 필드가 channel을 가리키는 것까지 서버 때와 같다.   하지만 클라이언트는 value를 가지고 있지 않다.
Subscribe 명령이 수행되면 클라이언트 구조체에 채널을 저장하고, 서버 구조체에도 채널을 저장한다.   Publish 명령이 수행되면 해당 채널을 찾아 클라이언트에 메시지를 보내는 것은 서버 구조체를 사용한다.
클라이언트 구조체는 클라이언트가 해당 채널을 등록하고, 클라이언트를 pubsub 모드로 전환하는 역할을 한다.   클라이언트가 normal 모드면 client buffer가 무제한이나 pubsub 모드이면 hard limit은 32mb, soft limit는 8mb로 제한한다.   Unsubscribe 또는 punsubscribe 명령이 수행되어 클라이언트 구조체에 채널이나 패턴이 하나도 없으면 클라이언트를 pubsub 모드에서 normal 모드로 전환한다.


패턴을 저장하는 클라이언트 링크드 리스트 데이터 구조

redis pubsub client patterns data structure
    그림 1-4   Redis Client Linked List Data structure for patterns

클라이언트 구조체의 pubsub_patterns 필드는 패턴을 저장하는 리스트를 가리킨다.   리스트의 각 노드는 서버와 달리 pattern 구조체를 바로 가리킨다.
Psubscribe 명령이 수행되면 클라이언트 구조체인 리스트에 패턴을 저장하고, 서버 구조체에도 패턴을 저장하며, 클라이언트를 pubsub 모드로 전환하는 역할도 한다.

데이터 구조를 설명했으니, 다음은 functions을 설명한다.


FUNCTIONS

레디스 pubsub는 여섯 개의 명령으로 구성되어 있다.
메시지를 받는 subscribe와 psubscribe가 있고, 메시지 받기를 중지하는 unsubscribe와 punsubscribe가 있다.   메시지를 보내는 publish가 있고, 등록된 채널 리스트와 개수를 조회하고 패턴 개수를 조회하는 pubsub 명령이 있다.
하나씩 내부 구조를 살펴보자.

SUBSCRIBE

redis pubsub subscribe function
    그림 2-1   Redis pubsub SUBSCRIBE function

채널을 등록해서 메시지를 받는 명령이다.
SUBSCRIBE 명령이 수행되면 서버 구조체(redisServer struct)와 클라이언트 구조체(redisClient struct)에 채널을 등록한다.
설명하는 Function의 인수(argument) 명은 이해를 돕기 위해 원 소스와 다르게 구조체 명으로 변경한 곳이 있다.   위 그림에 나오는 function은 굵은 글씨로 표시했다.

    1. 먼저 dictAdd(client->pubsub_channels,channel,NULL)를 실행하여 redisClient.pubsub_channels의 dict 구조체에 채널명을 등록한다.   세번째 인수가 값인데, NULL을 넘김으로써 값(value)은 NULL이 들어간다.   그림 1-2를 보면 이해가 쉽다.
    2. 여기서부터는 서버 구조체에 등록하는 과정이다.
      dictFind(server.pubsub_channels,channel)를 수행하여 채널이 없으면 list = listCreate()를 수행하여 리스트를 생성한다.   Channel이 이미 등록되어 있으면, list = dictGetVal(dictEntry)를 수행해서 리스트를 얻어오고, 4번으로 간다.
    3. dictAdd(server.pubsub_channels,channel,list)을 수행해서 채널과 리스트를 dict에 등록한다.
    4. listAddNodeTail(list,client)를 수행해서 리스트에 클라이언트를 등록한다.

그림 1-1과 같이 보면 도움이 된다.


PSUBSCRIBE

redis pubsub psubscribe function
    그림 2-2   Redis pubsub PSUBSCRIBE function

패턴을 등록해서 메시지를 받는 명령이다.
PSUBSCRIBE 명령이 수행되면 서버와 클라이언트의 리스트(Linked List)에 패턴을 등록한다.

    1. listSearchKey(client->pubsub_patterns,pattern)로 패턴이 이미 있는지 확인한다.
      없으면 listAddNodeTail(client->pubsub_patterns,pattern)를 수행해서 클라이언트 리스트에 패턴을 등록한다.   그림 1-4를 보면 도움이 된다.
    2. listAddNodeTail(server.pubsub_patterns,pat)로 서버에 패턴을 등록하는데, 클라이언트에는 패턴만 등록하는 반면, 서버에는 pubsubPattern 구조체에 client와 pattern을 담아서 같이 등록한다.   왜냐하면 패턴을 등록한 클라이언트에 메시지를 보내기 위해서이다.   그림 1-2를 보면 도움이 된다.

UNSUBSCRIBE

redis pubsub unsubscribe function
    그림 2-3   Redis pubsub UNSUBSCRIBE function

등록한 채널을 삭제해서 더 이상 메시지를 받지 않도록 한다.
인수로 채널을 입력하지 않으면 pubsubUnsubscribeAllChannels()를 호출해서 클라이언트에 등록된 모든 채널을 삭제한다.   pubsubUnsubscribeAllChannels()에서 while loop를 돌면서 pubsubUnsubscribeChannel()를 호출해서 dictNect()를 이용해서 채널을 하나씩 삭제한다.
채널을 입력하면 입력한 채널 개수만큼 while loop를 돌면서 pubsubUnsubscribeChannel()를 호출해서 채널을 하나씩 삭제한다.

    1. dictDelete(client->pubsub_channels,channel)를 수행해서 클라이언트에 채널을 삭제한다.
    2. dictEntry = dictFind(server.pubsub_channels,channel)로 서버에서 dictEntry를 구한다.
    3. list = dictGetVal(dictEntry)로 value에 저장된 list를 가져온다.
    4. listNode = listSearchKey(list,client)를 수행해서 리스트 노드를 얻어온다.
    5. listDelNode(list,listNode)를 수행해서 리스트에서 노드를 삭제한다.
    6. 리스트에 노드가 하나도 없으면 dictDelete(server.pubsub_channels,channel)를 수행해서 리스트를 삭제한다.

PUNSUBSCRIBE

redis pubsub punsubscribe function
    그림 2-4   Redis pubsub PUNSUBSCRIBE function

등록한 패턴을 삭제해서 더 이상 메시지를 받지 않도록 한다.
인수로 패턴을 입력하지 않으면 pubsubUnsubscribeAllPatterns()를 호출해서 클라이언트에 등록된 모든 패턴을 삭제한다.   pubsubUnsubscribeAllPatterns()에서 while loop를 돌면서 pubsubUnsubscribePattern()를 호출해서 listDelNode()를 이용해서 채널을 하나씩 삭제한다.
채널을 입력하면 입력한 채널 개수만큼 while loop를 돌면서 pubsubUnsubscribePattern()를 호출해서 채널을 하나씩 삭제한다.

    1. 먼저 listSearchKey(client->pubsub_patterns,pattern)을 실행해서 클라이언트에서 패턴을 찾는다.
    2. listDelNode(client->pubsub_patterns,listNode)를 수행해서 패턴을 삭제한다.
    3. 다음 listSearchKey(server->pubsub_patterns,pubsubPattern)을 실행해서 서버에서 패턴을 찾는다.
    4. listDelNode(server->pubsub_patterns,listNode)를 수행해서 패턴을 삭제한다.

PUBLISH

redis pubsub publish function
    그림 2-5   Redis pubsub PUBLISH function

위에서 살펴본 4개 명령 중 앞에 2개는 메시지를 받는 것이고 뒤에 2개는 메시지를 더이상 받지 않는 명령입니다.
PUBLISH는 메시지를 보내는 명령입니다.
PUBLISH는 크게 두 부분으로 구성되어 있습니다.   하나는 서버 내에 클라이언트에게 메시지를 보내는 것이고, 다른 하나는 서버가 클러스터 모드이면 다른 서버들에게 전달해서 메시지를 각 서버에 접속해 있는 클라이언트들에게 메시지를 보내는 것입니다.
간단히 설명하면 Pub/Sub는 클러스터 모드를 지원합니다.

서버 내에서 동작 방식입니다.

    1. 먼저 서버에서 해당 채널을 찾는다. dictFind(server.pubsub_channels,channel)
    2. listNext(listNode)로 노드가 없을 때까지 While Loop를 돌면서 등록된 클라이언트에 addReplyBulk(client,message)로 메시지를 보낸다.
    3. 다음, listLength(server.pubsub_patterns)으로 확인해서 서버에 등록된 패턴이 있으면
    4. listNext(listNode)로 노드가 없을 때까지 While Loop를 돌면서 stringmatchlen() function으로 비교해서 맞으면 클라이언트에 addReplyBulk(pubsubPattern->client,message)로 메시지를 보낸다.

클러스터의 노드들에 전달하는 과정입니다.
clusterPropagatePublish()부터는 cluster.c 에 있습니다.
간단히 설명하면, 서버내에 클러서터의 서버들에 대한 정보가 dict에 저장되어 있고, dictNext()로 서버 정보를 얻어서 채널과 메시지를 보냅니다.   그럼 해당 서버는 채널과 메시지를 받아서 위와 같은 과정을 거처 클라이언트들에게 메시지를 보냅니다.


PUBSUB

redis pubsub pubsub function
    그림 2-6   Redis pubsub PUBSUB function
    • Subcommand가 channels 이면 server.pubsub_channels에서 dictNext()로 이동하면서 채널을 보여준다. 패턴이 입력되었으면 stringmatchlen()으로 패턴과 비교해서 맞는 채널들만 보여준다.
    • Subcommand가 numsub 면 해당 채널에 등록된 클라이언트 개수를 listLength(list)로 구해서 보여준다.
    • Subcommand가 numpat 면 리스트에 등록된 패턴 개수를 listLength(list)로 구해서 보여준다.

Clients for Java Jedis, Lettuce, Redisson
Clients for C Hiredis


<< XINFO Pub/Sub Introduction HyperLogLog Introduction >>

질문하거나 댓글을 보려면 클릭하세요.  댓글수 :    조회수 :

Email 답글이 올라오면 이메일로 알려드리겠습니다.