Source code for examples.versioned_rows.versioned_rows_w_versionid
"""Illustrates a method to intercept changes on objects, turningan UPDATE statement on a single row into an INSERT statement, so that a newrow is inserted with the new data, keeping the old row intact.This example adds a numerical version_id to the Versioned class as wellas the ability to see which row is the most "current" version."""fromsqlalchemyimportBooleanfromsqlalchemyimportColumnfromsqlalchemyimportcreate_enginefromsqlalchemyimporteventfromsqlalchemyimportForeignKeyConstraintfromsqlalchemyimportfuncfromsqlalchemyimportIntegerfromsqlalchemyimportselectfromsqlalchemyimportStringfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportattributesfromsqlalchemy.ormimportbackreffromsqlalchemy.ormimportcolumn_propertyfromsqlalchemy.ormimportmake_transientfromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportSessionfromsqlalchemy.ormimportsessionmakerclassVersioned(object):# we have a composite primary key consisting of "id"# and "version_id"id=Column(Integer,primary_key=True)version_id=Column(Integer,primary_key=True,default=1)# optional - add a persisted is_current_version columnis_current_version=Column(Boolean,default=True)# optional - add a calculated is_current_version column@classmethoddef__declare_last__(cls):alias=cls.__table__.alias()cls.calc_is_current_version=column_property(select(func.max(alias.c.version_id)==cls.version_id).where(alias.c.id==cls.id))defnew_version(self,session):# optional - set previous version to have is_current_version=Falseold_id=self.idsession.query(self.__class__).filter_by(id=old_id).update(values=dict(is_current_version=False),synchronize_session=False)# make us transient (removes persistent# identity).make_transient(self)# increment version_id, which means we have a new PK.self.version_id+=1@event.listens_for(Session,"before_flush")defbefore_flush(session,flush_context,instances):forinstanceinsession.dirty:ifnotisinstance(instance,Versioned):continueifnotsession.is_modified(instance):continueifnotattributes.instance_state(instance).has_identity:continue# make it transientinstance.new_version(session)# re-addsession.add(instance)Base=declarative_base()engine=create_engine("sqlite://",echo=True)Session=sessionmaker(engine)# example 1, simple versioningclassExample(Versioned,Base):__tablename__="example"data=Column(String)Base.metadata.create_all(engine)session=Session()e1=Example(id=1,data="e1")session.add(e1)session.commit()e1.data="e2"session.commit()assert(session.query(Example.id,Example.version_id,Example.is_current_version,Example.calc_is_current_version,Example.data,).order_by(Example.id,Example.version_id).all()==([(1,1,False,False,"e1"),(1,2,True,True,"e2")]))# example 2, versioning with a parentclassParent(Base):__tablename__="parent"id=Column(Integer,primary_key=True)child_id=Column(Integer)child_version_id=Column(Integer)child=relationship("Child",backref=backref("parent",uselist=False))__table_args__=(ForeignKeyConstraint(["child_id","child_version_id"],["child.id","child.version_id"]),)classChild(Versioned,Base):__tablename__="child"data=Column(String)defnew_version(self,session):# expire parent's reference to ussession.expire(self.parent,["child"])# create new versionVersioned.new_version(self,session)# re-add ourselves to the parent. this causes the# parent foreign key to be updated alsoself.parent.child=selfBase.metadata.create_all(engine)session=Session()p1=Parent(child=Child(id=1,data="c1"))session.add(p1)session.commit()p1.child.data="c2"session.commit()assertp1.child_id==1assertp1.child.version_id==2assert(session.query(Child.id,Child.version_id,Child.is_current_version,Child.calc_is_current_version,Child.data,).order_by(Child.id,Child.version_id).all()==([(1,1,False,False,"c1"),(1,2,True,True,"c2")]))