Fritzy.io

Postgres’s publish-subscribe features made better with JSON

andyet postpostgresnode.js

If you need a database feature, Postgres probably has it.

Recently, in versions 9.3 and 9.4, the Postgres devs have added JSON support in the form of JSON types, functions, and operators.

JSON functionality ends up being pretty handy for REST APIs, which I will get into in a later post, but it also has some other uses.

You may not have known this, but Postgres has Publish-Subscribe functionality in the form of NOTIFY, LISTEN, UNLISTEN. This is commonly used for sending notifications that table rows have changed.

Unfortunately, the NOTIFY payload is merely text, meaning that structured data will need to be encoded somehow.

json_build_object to the rescue!

CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
id bigint;
BEGIN
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
id = NEW.id;
ELSE
id = OLD.id;
END IF;
PERFORM pg_notify('table_update', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER users_notify_update ON users;
CREATE TRIGGER users_notify_update AFTER UPDATE ON users FOR EACH ROW EXECUTE PROCEDURE table_update_notify();

DROP TRIGGER users_notify_insert ON users;
CREATE TRIGGER users_notify_insert AFTER INSERT ON users FOR EACH ROW EXECUTE PROCEDURE table_update_notify();

DROP TRIGGER users_notify_delete ON users;
CREATE TRIGGER users_notify_delete AFTER DELETE ON users FOR EACH ROW EXECUTE PROCEDURE table_update_notify();

Here we’ve created a stored-procedure that is meant to be called from a TRIGGER.

Since some operations exclusively populate NEW or OLD row objects, this stored procedure checks for that with an IF/ELSE statement.

Then it simply runs pg_notify to a channel with a json_build_object to compose the JSON for structured data.
Then we create a trigger for each operation for a given table, mapping it to our stored procedure.

Now we can use LISTEN in our language of choice, and parse the payload we created as JSON.

Here is a quick example in Node.js:

var pg = require ('pg');

pg.connect("postgres://localhost/fritzy", function(err, client) {
if(err) {
console.log(err);
}
client.on('notification', function(msg) {
if (msg.name === 'notification' && msg.channel === 'table_update') {
var pl = JSON.parse(msg.payload);
console.log("*========*");
Object.keys(pl).forEach(function (key) {
console.log(key, pl[key]);
});
console.log("-========-");
}
});
client.query("LISTEN table_update");
});

Here, I’m iterating through the object, simply to show that it is structured.

Now, regardless of where your TRIGGER assigned table is changed, you’ll receive JSON updates when you LISTEN.

> INSERT INTO users (username) values ('fritzy');

*========*
type INSERT
id 1
table users
-========-

> UPDATE users SET email='fritzy@andyet.com' WHERE id=1;

*========*
type UPDATE
id 1
table users
-========-

> DELETE FROM users WHERE id=1;

*========*
type DELETE
id 1
table users
-========-

Other Use Cases

@fritzy interesting! Is there a way to get the actual change data? (a la CouchDB)

— Pe̛dr͘o̧ Tèi͜xei҉r͞ (@pgte) April 6, 2015

Yeah Pedro, there’s quite a bit we can do with the NEW and OLD rows in a trigger.

If you’d like to send the entire row, you can do something like:

pg_notify('table_update', row_to_json(NEW)::text);

Or even

pg_notify(json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP, 'row', row_to_json(NEW))

But remember that you’ll want to check the operation to make sure whether to use NEW or OLD

If you wanted to get really fancy, you could create a DIFF from the OLD row to NEW row on updates, and create a change feed like CouchDB.


@fritzy so i was thinking about using pg_notify() last week, but you introduce entropy into the system when the listening agent fails

— Tom Santero (@tsantero) April 6, 2015

Good point, Tom. What if you are depending on reliable messaging for something like invalidating a cache?
If your listening process flakes out, you’d be in trouble.

You could mitigate this risk by INSERTing into a log table for your INSERT/UPDATE/DELETE triggers, and NOTIFYing on INSERT to your log table instead.

Then, when your LISTEN-cache-invalidate process reconnects, it can check the log table for missed messages. You could use the JSON or JSONB type in your log table for easy parsing.


Please let me know via Email or Twitter @fritzy if you have any questions or thoughts.