rabbitmqstomp.lua 文件内容如下
-- lua-resty-rabbitmqstomp: Opinionated RabbitMQ (STOMP) client lib
-- Copyright (C) 2013 Rohit 'bhaisaab' Yadav, Wingify
-- Opensourced at Wingify in New Delhi under the MIT License
local byte = string.byte
local concat = table.concat
local error = error
local find = string.find
local gsub = string.gsub
local insert = table.insert
local len = string.len
local pairs = pairs
local setmetatable = setmetatable
local sub = string.sub
local tcp = ngx.socket.tcp
local _M = {_VERSION = "0.2"}
local mt = { __index = _M }
local LF = "\x0a"
local EOL = "\x0d\x0a"
local NULL_BYTE = "\x00"
local STATE_CONNECTED = 1
local STATE_COMMAND_SENT = 2
local function new(self, opts)
local sock, err = tcp()
if not sock then
return nil, err
end
if opts == nil then
opts = {username = "guest", password = "guest", vhost = "/", trailing_lf = true}
end
return setmetatable({ sock = sock, opts = opts}, mt)
end
local function set_timeout(self, timeout)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:settimeout(timeout)
end
local function _build_frame(self, command, headers, body)
local frame = {command, EOL}
if body then
headers["content-length"] = len(body)
end
for key, value in pairs(headers) do
insert(frame, key)
insert(frame, ":")
insert(frame, value)
insert(frame, EOL)
end
insert(frame, EOL)
if body then
insert(frame, body)
end
insert(frame, NULL_BYTE)
insert(frame, EOL)
return concat(frame, "")
end
local function _send_frame(self, frame)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:send(frame)
end
local function _receive_frame(self)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
local resp = nil
if self.opts.trailing_lf == nil or self.opts.trailing_lf == true then
resp = sock:receiveuntil(NULL_BYTE .. LF, {inclusive = true})
else
resp = sock:receiveuntil(NULL_BYTE, {inclusive = true})
end
local data, err, partial = resp()
return data, err
end
local function _login(self)
local headers = {}
headers["accept-version"] = "1.2"
headers["login"] = self.opts.username
headers["passcode"] = self.opts.password
headers["host"] = self.opts.vhost
local ok, err = _send_frame(self, _build_frame(self, "CONNECT", headers, nil))
if not ok then
return nil, err
end
local frame, err = _receive_frame(self)
if not frame then
return nil, err
end
-- We successfully received a frame, but it was an ERROR frame
if sub( frame, 1, len( 'ERROR' ) ) == 'ERROR' then
return nil, frame
end
self.state = STATE_CONNECTED
return frame
end
local function _logout(self)
local sock = self.sock
if not sock then
self.state = nil
return nil, "not initialized"
end
if self.state == STATE_CONNECTED then
-- Graceful shutdown
local headers = {}
headers["receipt"] = "disconnect"
sock:send(_build_frame(self, "DISCONNECT", headers, nil))
sock:receive("*a")
end
self.state = nil
return sock:close()
end
local function connect(self, ...)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
local ok, err = sock:connect(...)
if not ok then
return nil, "failed to connect: " .. err
end
local reused = sock:getreusedtimes()
if reused and reused > 0 then
self.state = STATE_CONNECTED
return 1
end
return _login(self)
end
local function send(self, msg, headers)
local ok, err = _send_frame(self, _build_frame(self, "SEND", headers, msg))
if not ok then
return nil, err
end
if headers["receipt"] ~= nil then
return _receive_frame(self)
end
return ok, err
end
local function subscribe(self, headers)
return _send_frame(self, _build_frame(self, "SUBSCRIBE", headers))
end
local function unsubscribe(self, headers)
return _send_frame(self, _build_frame(self, "UNSUBSCRIBE", headers))
end
local function receive(self)
local data, err = _receive_frame(self)
if not data then
return nil, err
end
local idx = find(data, "\n\n", 1)
return sub(data, idx + 2)
end
local function set_keepalive(self, ...)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
if self.state ~= STATE_CONNECTED then
return nil, "cannot be reused in the current connection state: "
.. (self.state or "nil")
end
self.state = nil
return sock:setkeepalive(...)
end
local function get_reused_times(self)
local sock = self.sock
if not sock then
return nil, "not initialized"
end
return sock:getreusedtimes()
end
local function close(self)
return _logout(self)
end
_M.new = new
_M.set_timeout = set_timeout
_M.connect = connect
_M.send = send
_M.subscribe = subscribe
_M.unsubscribe = unsubscribe
_M.receive = receive
_M.set_keepalive = set_keepalive
_M.get_reused_times = get_reused_times
_M.close = close
local class_mt = {
-- to prevent use of casual module global variables
__newindex = function (table, key, val)
error('attempt to write to undeclared variable "' .. key .. '"')
end
}
setmetatable(_M, class_mt)
return _M
放到 openresty-1.21.4.1-win64\lualib\resty 路径下
获取和源文件放在一起也可以
使用这个需要mq开启
stomp 这个协议的支持
最后修改于 2023-10-18 19:44:56
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

