Source code for postgresql_audit.migrations

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB

from .expressions import jsonb_change_key_name


def get_activity_table(schema=None):
    return sa.Table(
        'activity',
        sa.MetaData(),
        sa.Column('id', sa.Integer, primary_key=True),
        sa.Column('table_name', sa.String),
        sa.Column('verb', sa.String),
        sa.Column('old_data', JSONB),
        sa.Column('changed_data', JSONB),
        schema=schema,
    )


[docs]def alter_column(conn, table, column_name, func, schema=None): """ Run given callable against given table and given column in activity table jsonb data columns. This function is useful when you want to reflect type changes in your schema to activity table. In the following example we change the data type of User's age column from string to integer. :: from alembic import op from postgresql_audit import alter_column def upgrade(): op.alter_column( 'user', 'age', type_=sa.Integer ) alter_column( op, 'user', 'age', lambda value, activity_table: sa.cast(value, sa.Integer) ) :param conn: An object that is able to execute SQL (either SQLAlchemy Connection, Engine or Alembic Operations object) :param table: The table to run the column name changes against :param column_name: Name of the column to run callable against :param func: A callable to run against specific column in activity table jsonb data columns. The callable should take two parameters the jsonb value corresponding to given column_name and activity table object. :param schema: Optional name of schema to use. """ activity_table = get_activity_table(schema=schema) query = ( activity_table .update() .values( old_data=( activity_table.c.old_data + sa.cast(sa.func.json_build_object( column_name, func( activity_table.c.old_data[column_name], activity_table ) ), JSONB) ), changed_data=( activity_table.c.changed_data + sa.cast(sa.func.json_build_object( column_name, func( activity_table.c.changed_data[column_name], activity_table ) ), JSONB) ) ) .where(activity_table.c.table_name == table) ) return conn.execute(query)
[docs]def change_column_name( conn, table, old_column_name, new_column_name, schema=None ): """ Changes given `activity` jsonb data column key. This function is useful when you want to reflect column name changes to activity table. :: from alembic import op from postgresql_audit import change_column_name def upgrade(): op.alter_column( 'my_table', 'my_column', new_column_name='some_column' ) change_column_name(op, 'my_table', 'my_column', 'some_column') :param conn: An object that is able to execute SQL (either SQLAlchemy Connection, Engine or Alembic Operations object) :param table: The table to run the column name changes against :param old_column_name: Name of the column to change :param new_column_name: New colum name :param schema: Optional name of schema to use. """ activity_table = get_activity_table(schema=schema) query = ( activity_table .update() .values( old_data=jsonb_change_key_name( activity_table.c.old_data, old_column_name, new_column_name ), changed_data=jsonb_change_key_name( activity_table.c.changed_data, old_column_name, new_column_name ) ) .where(activity_table.c.table_name == table) ) return conn.execute(query)
[docs]def add_column(conn, table, column_name, default_value=None, schema=None): """ Adds given column to `activity` table jsonb data columns. In the following example we reflect the changes made to our schema to activity table. :: import sqlalchemy as sa from alembic import op from postgresql_audit import add_column def upgrade(): op.add_column('article', sa.Column('created_at', sa.DateTime())) add_column(op, 'article', 'created_at') :param conn: An object that is able to execute SQL (either SQLAlchemy Connection, Engine or Alembic Operations object) :param table: The table to remove the column from :param column_name: Name of the column to add :param default_value: The default value of the column :param schema: Optional name of schema to use. """ activity_table = get_activity_table(schema=schema) data = {column_name: default_value} query = ( activity_table .update() .values( old_data=sa.case( ( sa.cast(activity_table.c.old_data, sa.Text) != '{}', activity_table.c.old_data + data ), else_=sa.cast({}, JSONB) ), changed_data=sa.case( ( sa.and_( sa.cast( activity_table.c.changed_data, sa.Text ) != '{}', activity_table.c.verb != 'update' ), activity_table.c.changed_data + data ), else_=activity_table.c.changed_data ), ) .where(activity_table.c.table_name == table) ) return conn.execute(query)
[docs]def remove_column(conn, table, column_name, schema=None): """ Removes given `activity` jsonb data column key. This function is useful when you are doing schema changes that require removing a column. Let's say you've been using PostgreSQL-Audit for a while for a table called article. Now you want to remove one audited column called 'created_at' from this table. :: from alembic import op from postgresql_audit import remove_column def upgrade(): op.remove_column('article', 'created_at') remove_column(op, 'article', 'created_at') :param conn: An object that is able to execute SQL (either SQLAlchemy Connection, Engine or Alembic Operations object) :param table: The table to remove the column from :param column_name: Name of the column to remove :param schema: Optional name of schema to use. """ activity_table = get_activity_table(schema=schema) remove = sa.cast(column_name, sa.Text) query = ( activity_table .update() .values( old_data=activity_table.c.old_data - remove, changed_data=activity_table.c.changed_data - remove, ) .where(activity_table.c.table_name == table) ) return conn.execute(query)
[docs]def rename_table(conn, old_table_name, new_table_name, schema=None): """ Renames given table in activity table. You should remember to call this function whenever you rename a versioned table. :: from alembic import op from postgresql_audit import rename_table def upgrade(): op.rename_table('article', 'article_v2') rename_table(op, 'article', 'article_v2') :param conn: An object that is able to execute SQL (either SQLAlchemy Connection, Engine or Alembic Operations object) :param old_table_name: The name of table to rename :param new_table_name: New name of the renamed table :param schema: Optional name of schema to use. """ activity_table = get_activity_table(schema=schema) query = ( activity_table .update() .values(table_name=new_table_name) .where(activity_table.c.table_name == old_table_name) ) return conn.execute(query)