importbinasciifromsqlalchemyimporteventfromsqlalchemyimportTablefromsqlalchemy.sqlimportexpressionfromsqlalchemy.sqlimporttype_coercefromsqlalchemy.typesimportUserDefinedType# Python datatypesclassGisElement(object):"""Represents a geometry value."""def__str__(self):returnself.descdef__repr__(self):return"<%s at 0x%x; %r>"%(self.__class__.__name__,id(self),self.desc,)classBinaryGisElement(GisElement,expression.Function):"""Represents a Geometry value expressed as binary."""def__init__(self,data):self.data=dataexpression.Function.__init__(self,"ST_GeomFromEWKB",data,type_=Geometry(coerce_="binary"))@propertydefdesc(self):returnself.as_hex@propertydefas_hex(self):returnbinascii.hexlify(self.data)classTextualGisElement(GisElement,expression.Function):"""Represents a Geometry value expressed as text."""def__init__(self,desc,srid=-1):self.desc=descexpression.Function.__init__(self,"ST_GeomFromText",desc,srid,type_=Geometry)# SQL datatypes.classGeometry(UserDefinedType):"""Base PostGIS Geometry column type."""name="GEOMETRY"def__init__(self,dimension=None,srid=-1,coerce_="text"):self.dimension=dimensionself.srid=sridself.coerce=coerce_classcomparator_factory(UserDefinedType.Comparator):"""Define custom operations for geometry types."""# override the __eq__() operatordef__eq__(self,other):returnself.op("~=")(other)# add a custom operatordefintersects(self,other):returnself.op("&&")(other)# any number of GIS operators can be overridden/added here# using the techniques above.def_coerce_compared_value(self,op,value):returnselfdefget_col_spec(self):returnself.namedefbind_expression(self,bindvalue):ifself.coerce=="text":returnTextualGisElement(bindvalue)elifself.coerce=="binary":returnBinaryGisElement(bindvalue)else:assertFalsedefcolumn_expression(self,col):ifself.coerce=="text":returnfunc.ST_AsText(col,type_=self)elifself.coerce=="binary":returnfunc.ST_AsBinary(col,type_=self)else:assertFalsedefbind_processor(self,dialect):defprocess(value):ifisinstance(value,GisElement):returnvalue.descelse:returnvaluereturnprocessdefresult_processor(self,dialect,coltype):ifself.coerce=="text":fac=TextualGisElementelifself.coerce=="binary":fac=BinaryGisElementelse:assertFalsedefprocess(value):ifvalueisnotNone:returnfac(value)else:returnvaluereturnprocessdefadapt(self,impltype):returnimpltype(dimension=self.dimension,srid=self.srid,coerce_=self.coerce)# other datatypes can be added as needed.classPoint(Geometry):name="POINT"classCurve(Geometry):name="CURVE"classLineString(Curve):name="LINESTRING"# ... etc.# DDL integration# PostGIS historically has required AddGeometryColumn/DropGeometryColumn# and other management methods in order to create PostGIS columns. Newer# versions don't appear to require these special steps anymore. However,# here we illustrate how to set up these features in any case.defsetup_ddl_events():@event.listens_for(Table,"before_create")defbefore_create(target,connection,**kw):dispatch("before-create",target,connection)@event.listens_for(Table,"after_create")defafter_create(target,connection,**kw):dispatch("after-create",target,connection)@event.listens_for(Table,"before_drop")defbefore_drop(target,connection,**kw):dispatch("before-drop",target,connection)@event.listens_for(Table,"after_drop")defafter_drop(target,connection,**kw):dispatch("after-drop",target,connection)defdispatch(event,table,bind):ifeventin("before-create","before-drop"):regular_cols=[cforcintable.cifnotisinstance(c.type,Geometry)]gis_cols=set(table.c).difference(regular_cols)table.info["_saved_columns"]=table.c# temporarily patch a set of columns not including the# Geometry columnstable.columns=expression.ColumnCollection(*regular_cols)ifevent=="before-drop":forcingis_cols:bind.execute(select(func.DropGeometryColumn("public",table.name,c.name)).execution_options(autocommit=True))elifevent=="after-create":table.columns=table.info.pop("_saved_columns")forcintable.c:ifisinstance(c.type,Geometry):bind.execute(select(func.AddGeometryColumn(table.name,c.name,c.type.srid,c.type.name,c.type.dimension,)).execution_options(autocommit=True))elifevent=="after-drop":table.columns=table.info.pop("_saved_columns")setup_ddl_events()# illustrate usageif__name__=="__main__":fromsqlalchemyimport(create_engine,MetaData,Column,Integer,String,func,select,)fromsqlalchemy.ormimportsessionmakerfromsqlalchemy.ext.declarativeimportdeclarative_baseengine=create_engine("postgresql://scott:tiger@localhost/test",echo=True)metadata=MetaData(engine)Base=declarative_base(metadata=metadata)classRoad(Base):__tablename__="roads"road_id=Column(Integer,primary_key=True)road_name=Column(String)road_geom=Column(Geometry(2))metadata.drop_all()metadata.create_all()session=sessionmaker(bind=engine)()# Add objects. We can use strings...session.add_all([Road(road_name="Jeff Rd",road_geom="LINESTRING(191232 243118,191108 243242)",),Road(road_name="Geordie Rd",road_geom="LINESTRING(189141 244158,189265 244817)",),Road(road_name="Paul St",road_geom="LINESTRING(192783 228138,192612 229814)",),Road(road_name="Graeme Ave",road_geom="LINESTRING(189412 252431,189631 259122)",),Road(road_name="Phil Tce",road_geom="LINESTRING(190131 224148,190871 228134)",),])# or use an explicit TextualGisElement# (similar to saying func.GeomFromText())r=Road(road_name="Dave Cres",road_geom=TextualGisElement("LINESTRING(198231 263418,198213 268322)",-1),)session.add(r)# pre flush, the TextualGisElement represents the string we sent.assertstr(r.road_geom)=="LINESTRING(198231 263418,198213 268322)"session.commit()# after flush and/or commit, all the TextualGisElements# become PersistentGisElements.assertstr(r.road_geom)=="LINESTRING(198231 263418,198213 268322)"r1=session.query(Road).filter(Road.road_name=="Graeme Ave").one()# illustrate the overridden __eq__() operator.# strings come in as TextualGisElementsr2=(session.query(Road).filter(Road.road_geom=="LINESTRING(189412 252431,189631 259122)").one())r3=session.query(Road).filter(Road.road_geom==r1.road_geom).one()assertr1isr2isr3# core usage just fine:road_table=Road.__table__stmt=select(road_table).where(road_table.c.road_geom.intersects(r1.road_geom))print(session.execute(stmt).fetchall())# TODO: for some reason the auto-generated labels have the internal# replacement strings exposed, even though PG doesn't complain# look up the hex binary version, using SQLAlchemy castsas_binary=session.scalar(select(type_coerce(r.road_geom,Geometry(coerce_="binary"))))assertas_binary.as_hex==("01020000000200000000000000b832084100000000""e813104100000000283208410000000088601041")# back again, same method !as_text=session.scalar(select(type_coerce(as_binary,Geometry(coerce_="text"))))assertas_text.desc=="LINESTRING(198231 263418,198213 268322)"session.rollback()metadata.drop_all()