Postgres’s publish-subscribe features made better with JSON
andyet postpostgresnode.jsIf you need a database feature, Postgres probably has it.
Recently, in versions 9.3 and 9.4, the Postgres devs have added JSON support in the form of JSON types, functions, and operators.
JSON functionality ends up being pretty handy for REST APIs, which I will get into in a later post, but it also has some other uses.
You may not have known this, but Postgres has Publish-Subscribe functionality in the form of NOTIFY, LISTEN, UNLISTEN. This is commonly used for sending notifications that table rows have changed.
Unfortunately, the NOTIFY
payload is merely text, meaning that structured data will need to be encoded somehow.
json_build_object
to the rescue!
CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
id bigint;
BEGIN
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
id = NEW.id;
ELSE
id = OLD.id;
END IF;
PERFORM pg_notify('table_update', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER users_notify_update ON users;
CREATE TRIGGER users_notify_update AFTER UPDATE ON users FOR EACH ROW EXECUTE PROCEDURE table_update_notify();
DROP TRIGGER users_notify_insert ON users;
CREATE TRIGGER users_notify_insert AFTER INSERT ON users FOR EACH ROW EXECUTE PROCEDURE table_update_notify();
DROP TRIGGER users_notify_delete ON users;
CREATE TRIGGER users_notify_delete AFTER DELETE ON users FOR EACH ROW EXECUTE PROCEDURE table_update_notify();
Here we’ve created a stored-procedure that is meant to be called from a TRIGGER.
Since some operations exclusively populate NEW
or OLD
row objects, this stored procedure checks for that with an IF/ELSE statement.
Then it simply runs pg_notify
to a channel with a json_build_object
to compose the JSON for structured data.
Then we create a trigger for each operation for a given table, mapping it to our stored procedure.
Now we can use LISTEN in our language of choice, and parse the payload we created as JSON.
Here is a quick example in Node.js:
var pg = require ('pg');
pg.connect("postgres://localhost/fritzy", function(err, client) {
if(err) {
console.log(err);
}
client.on('notification', function(msg) {
if (msg.name === 'notification' && msg.channel === 'table_update') {
var pl = JSON.parse(msg.payload);
console.log("*========*");
Object.keys(pl).forEach(function (key) {
console.log(key, pl[key]);
});
console.log("-========-");
}
});
client.query("LISTEN table_update");
});
Here, I’m iterating through the object, simply to show that it is structured.
Now, regardless of where your TRIGGER
assigned table is changed, you’ll receive JSON updates when you LISTEN
.
> INSERT INTO users (username) values ('fritzy');
*========*
type INSERT
id 1
table users
-========-
> UPDATE users SET email='fritzy@andyet.com' WHERE id=1;
*========*
type UPDATE
id 1
table users
-========-
> DELETE FROM users WHERE id=1;
*========*
type DELETE
id 1
table users
-========-
Other Use Cases #
@fritzy interesting! Is there a way to get the actual change data? (a la CouchDB)
— Pe̛dr͘o̧ Tèi͜xei҉r͞ (@pgte) April 6, 2015
Yeah Pedro, there’s quite a bit we can do with the NEW
and OLD
rows in a trigger.
If you’d like to send the entire row, you can do something like:
pg_notify('table_update', row_to_json(NEW)::text);
Or even
pg_notify(json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW))
But remember that you’ll want to check the operation to make sure whether to use NEW
or OLD
If you wanted to get really fancy, you could create a DIFF from the OLD
row to NEW
row on updates, and create a change feed like CouchDB.
@fritzy so i was thinking about using pg_notify() last week, but you introduce entropy into the system when the listening agent fails
— Tom Santero (@tsantero) April 6, 2015
Good point, Tom. What if you are depending on reliable messaging for something like invalidating a cache?
If your listening process flakes out, you’d be in trouble.
You could mitigate this risk by INSERTing into a log table for your INSERT/UPDATE/DELETE triggers, and NOTIFYing on INSERT to your log table instead.
Then, when your LISTEN-cache-invalidate process reconnects, it can check the log table for missed messages. You could use the JSON or JSONB type in your log table for easy parsing.
Please let me know via Email or Twitter @fritzy if you have any questions or thoughts.