Mongo Database Driver for Blitz Max
BlitzMax Forums/BlitzMax Programming/Mongo Database Driver for Blitz Max
| ||
For those that do not know the Mongo database system is a very fast BSON/JSON based NO-SQL form of database. You can find out more about it here: http://www.mongodb.org/ I've been working on a Mongo Driver for BlitzMax which has now reached the point that I can retrieve a document from a Mongo collection. So yes this is early stages and is not feature complete and some functionality will definitely change. I am not intending this as a full Mongo driver, I have very specific goals which I need to solve for my own purposes - but I see no reason why this code cannot be shared and made use of by other BlitzMax users who use Mongo databases. So whilst I do not intend to support shards or clusters, I will be supporting these things: * To select a single document from a collection based upon multiple search criterea. * To select a collection of documents from a database based upon multiple search criterea. * To insert new documents into a collection. * To update documents within a collection based upon a specific _id. I am unlikely to need cursors so probably won't be including them, which means if you want to do paginated output with large datasets you'll have to modify my work. -*- The basic principle of the connector is as follows: There is a Mongo class which connects automatically to the default localhost and port. You can change the host and port in the constructor when you connect to the server. Local Mongo:TMongo = New TMongo Mongo.connect( <server$ optional> , <port optional> ) It will return true if connected, and false if there was an error. To select a database, call the use method Mongo.use( "myDatabase" ) And here is where the concepts are a little different from most other things. Instead of calling a query and getting an array of rows back - Mongo has a flexible internal format - so what I've done is I've allowed a custom type to be passed in to the query parameter, and your results will be returned casted to that type. You get back a collection, which I've called a BSONCollection as it processes all of the Mongo internal BSON code. Clearly I need to explain this better, so... Type TMyOwnObjectFormat field myVar , myOtherVar$ end Type local playerCollection.BSONCollection playerCollection = Mongo.get( "TMyOwnObjectFormat" , <some other params I've not explained yet> ) for local row:TMyOwnObjectFormat = EachIn playerCollection.documents 'each row I get back is in my own data format! Cool :D next So there's a few more parameters we need to do a successful query, as well as the reflection class we already specified we next need to give the collection name to fetch from, this is a simple string. And then we need to specify the search parameter. Now the search parameter is definitely going to change because I want to provide multiple parameters, but currently it's a simple string holding a name and value pair separated by a equals sign. name=value so if I wanted to search for a variable called MongoIsGreat with a value of 5, I would give "MongoIsGreat=5" At the moment the connector is only returning a single document, in fact this is the only functionality I have completed so far as I only started last night. So here goes - this is the Mongo Driver SuperStrict Const OP_REPLY% = 1 'Reply To a client request. responseTo is set Const OP_MSG% = 1000 'generic msg command followed by a String Const OP_UPDATE% = 2001 'update document Const OP_INSERT% = 2002 'insert New document Const RESERVED% = 2003 'formerly used For OP_GET_BY_OID Const OP_QUERY% = 2004 'query a collection Const OP_GET_MORE% = 2005 'Get more data from a query. See Cursors Const OP_DELETE% = 2006 'Delete documents Const OP_KILL_CURSORS% = 2007 'Tell database client is done with a cursor Const cstring$ = Chr(0) Type TMongo Field tcpSocket:TSocket Field tcpStream:TStream Field database$ Field buffer:TBank = CreateBank( 4 ) Method setBufferSize( bufferSize% ) Self.buffer.resize( bufferSize ) End Method Method connect%( mongoHost$ = "localhost" , mongoPort% = 27017 ) DebugLog( "Connecting to " + mongoHost + ":" + mongoPort ) Self.tcpSocket:TSocket = CreateTCPSocket() If( ConnectSocket( tcpSocket , HostIp( mongoHost ) , mongoPort ) ) DebugLog( "Connection" ) Self.tcpStream = CreateSocketStream( Self.tcpSocket , False ) Return True Else DebugLog( "Could not resolve host" ) Return False EndIf End Method Method use( databaseName$ ) Self.database = databaseName End Method Method get:BSONCollection( returnType$ , collection$ , query$ = "" , limitFields$ = "" , maxResults% = 1 ) Local requestID% = Rand( 0 , 2^32 ) Local fullCollectionName$ = Self.database + "." + collection + cstring query = Self.QueryToBSON( Replace( query , "'" , Chr(34) ) ) limitFields = Self.QueryToBSON( limitFields ) Local size% = Len( query ) + Len( limitFields ) + Len( fullCollectionName ) + 28 WriteInt( Self.tcpStream , size ) WriteInt( Self.tcpStream , requestID ) WriteInt( Self.tcpStream , 0 ) WriteInt( Self.tcpStream , OP_QUERY ) WriteInt( Self.tcpStream , 0 )'flags %100000 for full burst WriteString( Self.tcpStream , fullCollectionName ) WriteInt( Self.tcpStream , 0 ) 'offset for pagination WriteInt( Self.tcpStream , maxResults ) 'limit WriteString( Self.tcpStream , query ) WriteString( Self.tcpStream , limitFields ) FlushStream( Self.tcpStream ) Local output:BSONCollection= New BSONCollection output.setBSON( returnType , Self.listenFor( requestID ) ) Return output End Method Method listenFor:TBankStream( requestID% ) Local output:TBankStream Local gotHeader% = False Local gotData% = False Local messageLength% = 0 Repeat If Self.tcpSocket If Self.tcpSocket.ReadAvail() > 3 If Not gotHeader gotHeader = True messageLength = ReadInt( Self.tcpStream ) - 16 Local thisRequestID% = ReadInt( Self.tcpStream ) Local thisResponseTo% = ReadInt( Self.tcpStream ) Local opCode% = ReadInt( Self.tcpStream ) If( thisResponseTo = requestID ) DebugLog( "This is my data, it is " + messageLength + " bytes long" ) endIf If gotHeader And Self.tcpSocket.readAvail() >= messageLength gotData = true Local bank:TBank = CreateBank( messageLength ) output = CreateBankStream( bank ) WriteString( output , ReadString ( Self.tcpStream , messageLength ) ) EndIf EndIf EndIf Until gotData Return output End Method Method QuerytoBSON$( query$ ) Local bson$ = "" If( Len( query ) = 0 ) bson = Self.intString( 5 ) + Chr(0) else Local qPart$[2] Local equal% = Instr( query , "=" ) qPart[0] = Left( query , equal - 1 ) + cstring qPart[1] = Right( query , Len( query ) - equal ) + cstring Local vLength% = Len( qPart[1] ) bson = Chr( 2 ) + qPart[0] + Self.intString( vLength ) + qPart[1] + Chr(0) Local size% = Len( bson ) + 4 bson = Self.intString( size ) + bson endIf Return bson End method Method intString$( val:Int ) Local output$ = "" PokeInt( Self.buffer , 0 , val ) For Local i% = 0 To 3 output :+ chr( PeekByte( Self.buffer , i ) ) next Return output End method End Type Type BSONCollection Field documents:TList = CreateList() Method setBSON( returnType$ , data:TBankStream ) SeekStream( data , 0 ) Local responseFlags% = ReadInt( data ) Local cursorID:Long = ReadLong( data ) Local startingFrom% = ReadInt( data ) Local numberReturned% = ReadInt( data ) If( numberReturned = 1 ) Local obj:Object = Self.getDocument( returnType , data ) ListAddLast( Self.documents , obj ) Else DebugLog( "Multiple return objects" ) endIf CloseStream( data ) End method Method getDocument:Object( returnType$ , data:TBankStream ) Local objectType:TTypeId = TTypeId.ForName( returnType ) Local obj:Object = objectType.newObject() Local cmd% Local docsize% = ReadInt( data ) 'littleendianStream( data ) Repeat cmd% = ReadByte( data ) Select cmd Case 1 Local name$ = ReadLine( data ) Local value! = ReadDouble( data ) DebugLog( name + " = " + value ) Local fld:TField = objectType.findField( name ) fld.setDouble( obj , value ) Case 2 Local name$ = ReadLine( data ) Local length% = ReadInt( data ) Local value$ = Left( ReadString( data , length ) , length - 1 ) DebugLog( name + " = " + value ) Local fld:TField = objectType.findField( name ) fld.setString( obj , value ) Case 4 Local name$ = ReadLine( data ) Local fld:TField = objectType.findField( name ) Local arrayObject: Object= fld.get( obj ) Local arrayId:TTypeId = TTypeId.ForObject( arrayObject ) Local arrayParam$[1] Local arrayByteSize% = ReadInt( data ) Local arraySize% = 0 Local arrCmd% Repeat arrCmd = ReadByte( data ) Select arrCmd Case 1 Local index% = Int( ReadLine( data ).toInt() ) Local value! = ReadDouble( data ) arrayParam[ 0 ] = String( Int( value ) ) DebugLog( name +"[ " + index + " ] = " + arrayParam[0] ) arrayId.setArrayElement( arrayObject , index , arrayParam[0] ) Default End select Until arrCmd = 0 DebugLog( "Done Array " + name ) Local endArrayDocument% = ReadByte( data ) Case 7 Local name$ = ReadLine( data ) Local fld:TField = objectType.findField( name ) fld.setString( obj , ReadString( data , 12 ) ) Default DebugLog( "Unprocessed command " + cmd ) End Select Until cmd = 0 Or Eof( data ) 'bigEndianStream( data ) Return obj End Method End Type To test it I use my JSON serialisation super class and a small sample object which I have data for, I will include them here only to demonstrate how I am using this stuff and not as part of the connector Type JSON Method getJSON$() Local json$ Local id:TTypeId = TTypeId.ForObject( Self ) For Local fld:TField = EachIn id.EnumFields() Local fieldName$ = Chr( 34 ) + fld.Name() + Chr( 34 ) + ":" Local elementName$ = fld.TypeId().Name() If( Right( elementName , 2 ) = "[]" ) json :+ fieldName + "[" Local arrayObject: Object= fld.get( Self ) Local arrayId:TTypeId = TTypeId.ForObject( arrayObject ) Local length% = fld.typeId().ArrayLength( fld.get( Self ) , 0 ) For Local i% = 0 To length - 1 Local arrayElement:Object = arrayId.getArrayElement( arrayObject , i ) json :+ arrayElement.toString() + "," Next json = Left( json , Len( json ) - 1 ) + "]," Else Select elementName Case "String" json :+ fieldName + Chr(34) + fld.getString$( Self ) + Chr(34) + "," Case "Int" json :+ fieldName + fld.getInt( Self ) + "," Default DebugLog( fld.Name() +" is of unprocessed type " + fld.typeId().Name() ) End Select EndIf Next Return "{" + Left( json , Len( json ) - 1 ) + "}" End Method Method setJSON( json$ ) json = Left( Right( json , Len( json ) - 1 ) , Len( json ) - 1 ) + "," Local comma% = Instr( json , "," ) Local square% = Instr( json , "[" ) If square < comma Then comma = Instr( json , "]" , square ) + 1 Local id:TTypeId = TTypeId.ForObject( Self ) While( comma > 0 ) Local this$ = Left( json , comma ) Local colon% = Instr( this , ":" ) Local cmd$ = Mid( this , 2 , colon - 3 ) Local param$ = Mid( this , colon + 1 , Len( this ) - ( colon + 1 ) ) Local paramInt% = 0 Local fld:TField = id.findField( cmd ) If( Left( param, 1 ) = "[" And Right( param, 1 ) = "]" ) 'array Local arrayObject: Object= fld.get( Self ) Local arrayId:TTypeId = TTypeId.ForObject( arrayObject ) Local length% = fld.typeId().ArrayLength( fld.get( Self ) , 0 ) Local paramArray$[1] Local aparam$ = Mid( param , 2 , Len( param ) - 2 ) For Local i% = 0 To length - 1 Local acomma% = Instr( aparam , "," ) paramArray[0] = Left( aparam , acomma - 1 ) aparam = Right( aparam , Len( aparam ) - acomma ) arrayId.setArrayElement( arrayObject , i , paramArray[0] ) Next Else If( Left( param , 1 ) = Chr(34) And Right( param , 1 ) = Chr( 34 ) ) 'string param = Mid( param , 2 , Len( param ) - 2 ) If fld Then fld.setString( Self , param ) Else paramInt = Int( param ) If fld Then fld.setInt( Self , paramInt ) EndIf EndIf If Not fld And Len( cmd ) > 1 Then DebugLog( "Could not find field " + cmd ) json = Right( json , Len( json ) - comma ) comma = Instr( json , "," ) square = Instr( json , "[" ) If square < comma Then comma = Instr( json , "]" , square ) + 1 Wend End Method End Type Type TPlayer Extends JSON Field _id$ Field prefix$ Field name$ Field surname$ Field suffix$ Field race $, class$ Field gender% , level% , face% , hair% Field strength% , strengthPercentile% , dexterity% , constitution% , inteligence% , wisdom% , charisma% Field thaco% , ac% , hp% , weaponDice% Field equipment%[20] End Type And here is some code that shows how I pull it all together to perform a Mongo fetch Local Mongo:TMongo = New TMongo If( Mongo.connect() ) Local playerCollection:BSONCollection Mongo.use( "osric" ) playerCollection = Mongo.get( "Tplayer" , "players" , "surname=Rose" ) For Local player:TPlayer = EachIn playerCollection.documents DebugLog( player.getJSON() ) Next EndIf |
| ||
Fixed an issue with arrays and now supports returning multiple results, although the default is 1. Also a note i forgot to mention earlier: The reflection class that you pass to Mongo.get must have a Field _id$ property. This will hold the Mongo document index. Type TMongo Field tcpSocket:TSocket Field tcpStream:TStream Field database$ Field buffer:TBank = CreateBank( 4 ) Method setBufferSize( bufferSize% ) Self.buffer.resize( bufferSize ) End Method Method connect%( mongoHost$ = "localhost" , mongoPort% = 27017 ) DebugLog( "Connecting to " + mongoHost + ":" + mongoPort ) Self.tcpSocket:TSocket = CreateTCPSocket() If( ConnectSocket( tcpSocket , HostIp( mongoHost ) , mongoPort ) ) DebugLog( "Connection" ) Self.tcpStream = CreateSocketStream( Self.tcpSocket , False ) Return True Else DebugLog( "Could not resolve host" ) Return False EndIf End Method Method use( databaseName$ ) Self.database = databaseName End Method Method get:BSONCollection( returnType$ , collection$ , query$ = "" , limitFields$ = "" , maxResults% = 1 ) Local requestID% = Rand( 0 , 2^32 ) Local fullCollectionName$ = Self.database + "." + collection + cstring query = Self.QueryToBSON( Replace( query , "'" , Chr(34) ) ) limitFields = Self.QueryToBSON( limitFields ) Local size% = Len( query ) + Len( limitFields ) + Len( fullCollectionName ) + 28 WriteInt( Self.tcpStream , size ) WriteInt( Self.tcpStream , requestID ) WriteInt( Self.tcpStream , 0 ) WriteInt( Self.tcpStream , OP_QUERY ) WriteInt( Self.tcpStream , 0 )'flags %100000 for full burst WriteString( Self.tcpStream , fullCollectionName ) WriteInt( Self.tcpStream , 0 ) 'offset for pagination WriteInt( Self.tcpStream , maxResults ) 'limit WriteString( Self.tcpStream , query ) WriteString( Self.tcpStream , limitFields ) FlushStream( Self.tcpStream ) Local output:BSONCollection= New BSONCollection output.setBSON( returnType , Self.listenFor( requestID ) ) Return output End Method Method listenFor:TBankStream( requestID% ) Local output:TBankStream Local gotHeader% = False Local gotData% = False Local messageLength% = 0 Repeat If Self.tcpSocket If Self.tcpSocket.ReadAvail() > 3 If Not gotHeader gotHeader = True messageLength = ReadInt( Self.tcpStream ) - 16 Local thisRequestID% = ReadInt( Self.tcpStream ) Local thisResponseTo% = ReadInt( Self.tcpStream ) Local opCode% = ReadInt( Self.tcpStream ) If( thisResponseTo = requestID ) DebugLog( "This is my data, it is " + messageLength + " bytes long" ) endIf If gotHeader And Self.tcpSocket.readAvail() >= messageLength gotData = true Local bank:TBank = CreateBank( messageLength ) output = CreateBankStream( bank ) WriteString( output , ReadString ( Self.tcpStream , messageLength ) ) EndIf EndIf EndIf Until gotData Return output End Method Method QuerytoBSON$( query$ ) Local bson$ = "" If( Len( query ) = 0 ) bson = Self.intString( 5 ) + Chr(0) else Local qPart$[2] Local equal% = Instr( query , "=" ) qPart[0] = Left( query , equal - 1 ) + cstring qPart[1] = Right( query , Len( query ) - equal ) + cstring Local vLength% = Len( qPart[1] ) bson = Chr( 2 ) + qPart[0] + Self.intString( vLength ) + qPart[1] + Chr(0) Local size% = Len( bson ) + 4 bson = Self.intString( size ) + bson endIf Return bson End method Method intString$( val:Int ) Local output$ = "" PokeInt( Self.buffer , 0 , val ) For Local i% = 0 To 3 output :+ chr( PeekByte( Self.buffer , i ) ) next Return output End method End Type Type BSONCollection Field documents:TList = CreateList() Method setBSON( returnType$ , data:TBankStream ) SeekStream( data , 0 ) Local responseFlags% = ReadInt( data ) Local cursorID:Long = ReadLong( data ) Local startingFrom% = ReadInt( data ) Local numberReturned% = ReadInt( data ) If( numberReturned = 1 ) Local obj:Object = Self.getDocument( returnType , data ) ListAddLast( Self.documents , obj ) Else For Local rows% = 1 To numberReturned Local obj:Object = Self.getDocument( returnType , data ) ListAddLast( Self.documents , obj ) If rows < numberReturned Local docSeparator% docSeparator =ReadShort( data ) EndIf DebugLog( "---" ) Next EndIf CloseStream( data ) End Method Method getDocument:Object( returnType$ , data:TBankStream ) Local docsize% = ReadInt( data ) DebugLog( "DOCSIZE " + docsize ) Local objectType:TTypeId = TTypeId.ForName( returnType ) Local obj:Object = objectType.newObject() Local cmd% 'littleendianStream( data ) Repeat cmd% = ReadByte( data ) DebugLog( "CMD " + cmd ) Select cmd Case 1 Local name$ = ReadLine( data ) Local value! = ReadDouble( data ) Local fld:TField = objectType.findField( name ) fld.setDouble( obj , value ) Case 2 Local name$ = ReadLine( data ) Local length% = ReadInt( data ) Local value$ = Left( ReadString( data , length ) , length - 1 ) Local fld:TField = objectType.findField( name ) fld.setString( obj , value ) Case 4 Local name$ = ReadLine( data ) Local fld:TField = objectType.findField( name ) Local arrayObject: Object= fld.get( obj ) Local arrayId:TTypeId = TTypeId.ForObject( arrayObject ) Local arrayParam$[1] Local pointerPosition% = StreamPos( data ) + ReadInt( data ) Local arraySize% = 0 Local arrCmd% Repeat arrCmd = ReadByte( data ) Select arrCmd Case 1 Local index% = Int( ReadLine( data ).toInt() ) Local value! = ReadDouble( data ) arrayParam[ 0 ] = String( Int( value ) ) arrayId.setArrayElement( arrayObject , index , arrayParam[0] ) Default End Select Until arrCmd = 0 Local endArrayDocument% = ReadByte( data ) SeekStream( data , pointerPosition ) Case 7 Local name$ = ReadLine( data ) Local fld:TField = objectType.findField( name ) Local value$ = ReadString( data , 12 ) fld.setString( obj , value ) Case 0 'document end Default DebugLog( "Unprocessed " + cmd ) End Select Until cmd = 0 Or Eof( data ) SeekStream( data , StreamPos( data ) - 2 ) 'bigEndianStream( data ) Return obj End Method End Type |