db.postgres: implement wait() and post() based on Postgres notifications#428
Open
starius wants to merge 5 commits intoleafo:masterfrom
Open
db.postgres: implement wait() and post() based on Postgres notifications#428starius wants to merge 5 commits intoleafo:masterfrom
starius wants to merge 5 commits intoleafo:masterfrom
Conversation
Postgres supports publish-subscribe feature. One client listens on a channel
(provided as a string), while another (or same) client send a notification.
It can be used to send notifications accross machines and nginx workers.
The notification can be delivered with any read operation. To decouple
receiving notifications from reading results of normal queries, a
background handler with independent database socket is created. The
handler starts two light threads: a reader and a writer. The reader reads
new notifications from the database. The writer pushes commands "listen"
and "unlisten" to the database. Signals to/from the background handler
are delivered using openresty's semaphores.
Two new methods of "lapis.db.postgres" (and "lapis.db") were added:
* wait(channel) -- waits on the channel until it is notified with post().
Returns a payload passed by caller of post().
* post(channel, payload) -- notifies the channel with the payload.
All waiting light threads in nginx worker share the same database socket.
All light threads waiting on the same channel use the same semaphore
object which is triggered by reader light thread of the background handler.
When new channel is started being listened, it is provided to writer light
thread of the background handler and it sends command "listen channel".
When a notification is received, the corresponding channel is "unlistened"
by writer light thread and the corresponding semaphore is removed. It is
needed to prevent resource leaks in nginx worker and in database worker.
All methods are 100% non-blocking.
Notifications add zero overhead when not used.
Example:
location /wait {
default_type text/html;
lua_check_client_abort on;
content_by_lua '
local channel = ngx.req.get_uri_args().channel
while true do
ngx.say(require("lapis.db").wait(channel))
ngx.flush()
end
';
}
location /post {
default_type text/html;
content_by_lua '
local channel = ngx.req.get_uri_args().channel
local payload = ngx.req.get_uri_args().payload
require("lapis.db").post(channel, payload)
';
}
$ curl http://localhost:8080/wait?channel=foo
In another terminal:
$ curl 'http://localhost:25516/post?channel=foo&payload=bar'
The first terminal should output "bar".
In general, read operation is limited by settings of the server or firewall. In case of errors in reader or writer, new background Lua handler runs. All channels being listened are re-listened. Timeout of socket is also set to 5 minutes to have a value.
In previous implementation, resources were leaked in case of disconnection of all clients (sema:count() == 0) and no notification on the channel. The entry of "queues" in nginx and the channel listener in postres were leaked. Function cleanup_queues was introduced. It checks all queues and removes unused items and unlistens corresponding channels. It is called when reader light thread detects read timeout (after 5 minutes of no notifications) or once in 10000 received notifications. Relistening channels after restart of the background Lua handler was moved to function listen_queues.
Instead of checking all channels once in a while, put them to weak table and unlisten in a finalizer.
Send UNLISTEN command after a channel being unused for some time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Postgres supports publish-subscribe feature. One client listens on a channel
(provided as a string), while another (or same) client send a notification.
It can be used to send notifications accross machines and nginx workers.
The notification can be delivered with any read operation. To decouple
receiving notifications from reading results of normal queries, a
background handler with independent database socket is created. The
handler starts two light threads: a reader and a writer. The reader reads
new notifications from the database. The writer pushes commands "listen"
and "unlisten" to the database. Signals to/from the background handler
are delivered using openresty's semaphores.
Two new methods of "lapis.db.postgres" (and "lapis.db") were added:
Returns a payload passed by caller of post().
All waiting light threads in nginx worker share the same database socket.
All light threads waiting on the same channel use the same semaphore
object which is triggered by reader light thread of the background handler.
When new channel is started being listened, it is provided to writer light
thread of the background handler and it sends command "listen channel".
When a notification is received, the corresponding channel is "unlistened"
by writer light thread and the corresponding semaphore is removed. It is
needed to prevent resource leaks in nginx worker and in database worker.
All methods are 100% non-blocking.
Notifications add zero overhead when not used.
Example:
In another terminal:
The first terminal should output "bar".
This pull request depends on leafo/pgmoon#28