Mongo Database Driver for Blitz Max

BlitzMax Forums/BlitzMax Programming/Mongo Database Driver for Blitz Max

Banshee(Posted 2014) [#1]
For those that do not know the Mongo database system is a very fast BSON/JSON based NO-SQL form of database. You can find out more about it here: http://www.mongodb.org/

I've been working on a Mongo Driver for BlitzMax which has now reached the point that I can retrieve a document from a Mongo collection.

So yes this is early stages and is not feature complete and some functionality will definitely change.

I am not intending this as a full Mongo driver, I have very specific goals which I need to solve for my own purposes - but I see no reason why this code cannot be shared and made use of by other BlitzMax users who use Mongo databases.

So whilst I do not intend to support shards or clusters, I will be supporting these things:

* To select a single document from a collection based upon multiple search criterea.
* To select a collection of documents from a database based upon multiple search criterea.
* To insert new documents into a collection.
* To update documents within a collection based upon a specific _id.

I am unlikely to need cursors so probably won't be including them, which means if you want to do paginated output with large datasets you'll have to modify my work.

-*-

The basic principle of the connector is as follows:

There is a Mongo class which connects automatically to the default localhost and port. You can change the host and port in the constructor when you connect to the server.

Local Mongo:TMongo = New TMongo
Mongo.connect( <server$ optional> , <port optional> )


It will return true if connected, and false if there was an error.

To select a database, call the use method

Mongo.use( "myDatabase" )


And here is where the concepts are a little different from most other things. Instead of calling a query and getting an array of rows back - Mongo has a flexible internal format - so what I've done is I've allowed a custom type to be passed in to the query parameter, and your results will be returned casted to that type.

You get back a collection, which I've called a BSONCollection as it processes all of the Mongo internal BSON code.

Clearly I need to explain this better, so...

Type TMyOwnObjectFormat
    field myVar , myOtherVar$
end Type

local playerCollection.BSONCollection
playerCollection = Mongo.get( "TMyOwnObjectFormat" , <some other params I've not explained yet> )
for local row:TMyOwnObjectFormat = EachIn playerCollection.documents
    'each row I get back is in my own data format! Cool :D
next


So there's a few more parameters we need to do a successful query, as well as the reflection class we already specified we next need to give the collection name to fetch from, this is a simple string. And then we need to specify the search parameter.

Now the search parameter is definitely going to change because I want to provide multiple parameters, but currently it's a simple string holding a name and value pair separated by a equals sign.

name=value

so if I wanted to search for a variable called MongoIsGreat with a value of 5, I would give "MongoIsGreat=5"

At the moment the connector is only returning a single document, in fact this is the only functionality I have completed so far as I only started last night.

So here goes - this is the Mongo Driver
SuperStrict
Const	OP_REPLY%		= 1	'Reply To a client request. responseTo is set
Const	OP_MSG%		= 1000	'generic msg command followed by a String
Const	OP_UPDATE%		= 2001	'update document
Const	OP_INSERT%		= 2002	'insert New document
Const	RESERVED%		= 2003	'formerly used For OP_GET_BY_OID
Const	OP_QUERY%		= 2004	'query a collection
Const	OP_GET_MORE%	= 2005	'Get more data from a query. See Cursors
Const	OP_DELETE%		= 2006	'Delete documents
Const	OP_KILL_CURSORS%	= 2007	'Tell database client is done with a cursor
Const cstring$			= Chr(0)


Type TMongo

	Field tcpSocket:TSocket
	Field tcpStream:TStream
		
	Field database$
	Field buffer:TBank = CreateBank( 4 )
		
	Method setBufferSize( bufferSize% )
		Self.buffer.resize( bufferSize )
	End Method
		
	Method connect%( mongoHost$ = "localhost" , mongoPort% = 27017 )
	
		DebugLog( "Connecting to " + mongoHost + ":" + mongoPort )
	
		Self.tcpSocket:TSocket = CreateTCPSocket()
		If( ConnectSocket( tcpSocket , HostIp( mongoHost ) , mongoPort ) )
			DebugLog( "Connection" )
			Self.tcpStream = CreateSocketStream( Self.tcpSocket , False )
			Return True
		Else
			DebugLog( "Could not resolve host" )
			Return False
		EndIf
	
	End Method
	
	Method use( databaseName$ )
		Self.database = databaseName
	End Method
	
	Method get:BSONCollection( returnType$ , collection$ , query$ = "" , limitFields$ = "" , maxResults% = 1 )
		
		Local requestID% = Rand( 0 , 2^32 )
		
		Local fullCollectionName$ = Self.database + "." + collection + cstring
		query = Self.QueryToBSON( Replace( query , "'" , Chr(34) ) )
		limitFields = Self.QueryToBSON( limitFields )
		
		Local size% = Len( query ) + Len( limitFields ) + Len( fullCollectionName ) + 28
		
		WriteInt( Self.tcpStream , size )
		WriteInt( Self.tcpStream , requestID )
		WriteInt( Self.tcpStream , 0 )
		WriteInt( Self.tcpStream , OP_QUERY )
		WriteInt( Self.tcpStream , 0 )'flags %100000 for full burst
		WriteString( Self.tcpStream , fullCollectionName ) 
		WriteInt( Self.tcpStream , 0 ) 'offset for pagination
		WriteInt( Self.tcpStream , maxResults ) 'limit
		WriteString( Self.tcpStream , query )
		WriteString( Self.tcpStream , limitFields )
		FlushStream( Self.tcpStream )
		
		Local output:BSONCollection= New BSONCollection
		output.setBSON( returnType , Self.listenFor( requestID ) )
		Return output
		
	End Method
	
	Method listenFor:TBankStream( requestID% )

		Local output:TBankStream

		Local gotHeader% = False
		Local gotData% = False
		
		Local messageLength% = 0

		Repeat	
				
			If Self.tcpSocket
			
				If Self.tcpSocket.ReadAvail() > 3
					
					If Not gotHeader
						gotHeader = True
						
						messageLength = ReadInt( Self.tcpStream ) - 16
						Local thisRequestID% = ReadInt( Self.tcpStream )
						Local thisResponseTo% = ReadInt( Self.tcpStream )
						Local opCode% = ReadInt( Self.tcpStream )
						If( thisResponseTo = requestID ) DebugLog( "This is my data, it is " + messageLength + " bytes long" )		
						
					endIf

					If gotHeader And Self.tcpSocket.readAvail() >= messageLength
						gotData = true
						Local bank:TBank = CreateBank( messageLength )
						output = CreateBankStream( bank )
						WriteString( output , ReadString ( Self.tcpStream , messageLength ) )
					EndIf

				EndIf
	
			EndIf
	
		Until gotData
		
		Return output
	
	End Method
	
	Method QuerytoBSON$( query$ )

		Local bson$ = ""
		
		If( Len( query ) = 0 )
		
			bson = Self.intString( 5 ) + Chr(0)
		
		else
		
			Local qPart$[2]
			Local equal% = Instr( query , "=" )
			qPart[0] = Left( query , equal - 1 ) + cstring
			qPart[1] = Right( query , Len( query ) - equal ) + cstring
			
			Local vLength% = Len( qPart[1] )
			bson = Chr( 2 ) + qPart[0] + Self.intString( vLength ) + qPart[1] + Chr(0)
			
			Local size% = Len( bson ) + 4
			bson = Self.intString( size ) + bson
		
		endIf
		
		Return bson
		
	End method
	Method intString$( val:Int )
		
		Local output$ = ""
		PokeInt( Self.buffer , 0 , val )
		
		For Local i% = 0 To 3
			output :+ chr( PeekByte( Self.buffer , i ) )
		next
		
		Return output
		
	End method
	
End Type

Type BSONCollection

	Field documents:TList = CreateList()

	Method setBSON( returnType$ , data:TBankStream )
		
		SeekStream( data , 0 )
		Local responseFlags% = ReadInt( data )
		Local cursorID:Long = ReadLong( data )
		Local startingFrom% = ReadInt( data )
		Local numberReturned% = ReadInt( data )
		
		If( numberReturned = 1 )

			Local obj:Object = Self.getDocument( returnType , data )
			ListAddLast( Self.documents , obj )

		Else
			DebugLog( "Multiple return objects" )
		endIf
		
		CloseStream( data )
	
	End method
	Method getDocument:Object( returnType$ , data:TBankStream  )
		
		Local objectType:TTypeId = TTypeId.ForName( returnType )
		Local obj:Object = objectType.newObject()
		
		Local cmd%
		Local docsize% = ReadInt( data )

		'littleendianStream( data )

		Repeat
			cmd% = ReadByte( data )	
		
			Select cmd
				Case 1
					Local name$ = ReadLine( data )
					Local value! = ReadDouble( data )
					
					DebugLog( name + " = " + value )

					Local fld:TField = objectType.findField( name )
					fld.setDouble( obj , value )
				Case 2
					Local name$ = ReadLine( data )
					Local length% = ReadInt( data )
					Local value$ = Left( ReadString( data , length ) , length - 1 )

					DebugLog( name + " = " + value )

					Local fld:TField = objectType.findField( name )
					fld.setString( obj , value )
				Case 4
					Local name$ = ReadLine( data )

					Local fld:TField = objectType.findField( name )
					Local arrayObject: Object= fld.get( obj )
					Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )

					Local arrayParam$[1]
					Local arrayByteSize% = ReadInt( data )
					Local arraySize% = 0
					
					Local arrCmd%
					Repeat
						arrCmd = ReadByte( data )
						Select arrCmd
							Case 1
								Local index% = Int( ReadLine( data ).toInt() )
								Local value! = ReadDouble( data )

								arrayParam[ 0 ] = String( Int( value ) )
								DebugLog( name +"[ " + index + " ] = " + arrayParam[0] )
								arrayId.setArrayElement( arrayObject , index , arrayParam[0] )

							Default
						End select
					Until arrCmd = 0

					DebugLog( "Done Array " + name )	
					
					Local endArrayDocument% = ReadByte( data )					
				Case 7
					Local name$ = ReadLine( data )
					
					Local fld:TField = objectType.findField( name )
					fld.setString( obj , ReadString( data , 12 ) )
				Default
					DebugLog( "Unprocessed command " + cmd )
			End Select
			
		Until cmd = 0 Or Eof( data )
	
		'bigEndianStream( data )

		Return obj
		
	End Method
	
End Type


To test it I use my JSON serialisation super class and a small sample object which I have data for, I will include them here only to demonstrate how I am using this stuff and not as part of the connector

Type JSON
	
	Method getJSON$()
		
		Local json$
		
		Local id:TTypeId = TTypeId.ForObject( Self )
		For Local fld:TField = EachIn id.EnumFields()
			
			Local fieldName$ = Chr( 34 ) + fld.Name() + Chr( 34 ) + ":"
			Local elementName$ = fld.TypeId().Name()
			
			If( Right( elementName , 2 ) = "[]" )
			
				json :+ fieldName + "["
			
				Local arrayObject: Object= fld.get( Self )
				Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )
				Local length% = fld.typeId().ArrayLength( fld.get( Self ) , 0 )
				
				For Local i% = 0 To length - 1
					Local arrayElement:Object = arrayId.getArrayElement( arrayObject , i )
					json :+ arrayElement.toString() + ","
				Next
				
				json = Left( json , Len( json ) - 1 ) + "],"
			
			Else
				Select elementName
				
					Case "String"
						json :+ fieldName + Chr(34) + fld.getString$( Self ) + Chr(34) + ","
					Case "Int"
						json :+ fieldName + fld.getInt( Self ) + ","
					Default
						DebugLog( fld.Name() +" is of unprocessed type " + fld.typeId().Name() )
				End Select
			EndIf
			
		Next
		
		Return "{" + Left( json , Len( json ) - 1 ) + "}"

	End Method
	
	Method setJSON( json$ )
		
		json = Left( Right( json , Len( json ) - 1 ) , Len( json ) - 1 ) + ","
		Local comma% = Instr( json , "," )
		Local square% = Instr( json , "[" )
		If square < comma Then comma = Instr( json , "]" , square ) + 1
		
		Local id:TTypeId = TTypeId.ForObject( Self )
		
		While( comma > 0 )

			Local this$ = Left( json , comma )
			Local colon% = Instr( this , ":" )
			Local cmd$ = Mid( this , 2 , colon - 3 )
			Local param$ = Mid( this , colon + 1 , Len( this ) - ( colon + 1 ) )
			Local paramInt% = 0
			
			Local fld:TField = id.findField( cmd )
			
			If( Left( param, 1 ) = "[" And Right( param, 1 ) = "]" )
				'array
				
				Local arrayObject: Object= fld.get( Self )
				Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )
				Local length% = fld.typeId().ArrayLength( fld.get( Self ) , 0 )
				
				Local paramArray$[1]
				
				Local aparam$ = Mid( param , 2 , Len( param ) - 2 )
				
				For Local i% = 0 To length - 1
					Local acomma% = Instr( aparam , "," )
					paramArray[0] = Left( aparam , acomma - 1 )
					aparam = Right( aparam , Len( aparam ) - acomma )
					
					arrayId.setArrayElement( arrayObject , i , paramArray[0] )
										
				Next

			Else
				If( Left( param , 1 ) = Chr(34) And Right( param , 1 ) = Chr( 34 ) )
					'string
					param = Mid( param , 2 , Len( param ) - 2 )
					If fld Then fld.setString( Self , param )
				Else
					paramInt = Int( param )
					If fld Then fld.setInt( Self , paramInt )
				EndIf
			EndIf
			If Not fld And Len( cmd ) > 1 Then DebugLog( "Could not find field " + cmd )
			
			json = Right( json , Len( json ) - comma )
			comma = Instr( json , "," ) 
			square = Instr( json , "[" )
			If square < comma Then comma = Instr( json , "]" , square ) + 1

		Wend
		
	End Method
	
End Type
Type TPlayer Extends JSON
	
	Field _id$
	
	Field prefix$
	Field name$
	Field surname$
	Field suffix$
	
	Field race $, class$
	Field gender% , level% , face% , hair%
	Field strength% , strengthPercentile% , dexterity% , constitution% , inteligence% , wisdom% , charisma%
	Field thaco% , ac% , hp% , weaponDice%

	Field equipment%[20]
	

End Type


And here is some code that shows how I pull it all together to perform a Mongo fetch

Local Mongo:TMongo = New TMongo
If( Mongo.connect() )

	Local playerCollection:BSONCollection
	Mongo.use( "osric" )
	playerCollection = Mongo.get( "Tplayer" , "players" , "surname=Rose" )
	For Local player:TPlayer = EachIn playerCollection.documents
		DebugLog( player.getJSON() )
	Next
EndIf



Banshee(Posted 2014) [#2]
Fixed an issue with arrays and now supports returning multiple results, although the default is 1.

Also a note i forgot to mention earlier: The reflection class that you pass to Mongo.get must have a Field _id$ property. This will hold the Mongo document index.



Type TMongo

	Field tcpSocket:TSocket
	Field tcpStream:TStream
		
	Field database$
	Field buffer:TBank = CreateBank( 4 )
		
	Method setBufferSize( bufferSize% )
		Self.buffer.resize( bufferSize )
	End Method
		
	Method connect%( mongoHost$ = "localhost" , mongoPort% = 27017 )
	
		DebugLog( "Connecting to " + mongoHost + ":" + mongoPort )
	
		Self.tcpSocket:TSocket = CreateTCPSocket()
		If( ConnectSocket( tcpSocket , HostIp( mongoHost ) , mongoPort ) )
			DebugLog( "Connection" )
			Self.tcpStream = CreateSocketStream( Self.tcpSocket , False )
			Return True
		Else
			DebugLog( "Could not resolve host" )
			Return False
		EndIf
	
	End Method
	
	Method use( databaseName$ )
		Self.database = databaseName
	End Method
	
	Method get:BSONCollection( returnType$ , collection$ , query$ = "" , limitFields$ = "" , maxResults% = 1 )
		
		Local requestID% = Rand( 0 , 2^32 )
		
		Local fullCollectionName$ = Self.database + "." + collection + cstring
		query = Self.QueryToBSON( Replace( query , "'" , Chr(34) ) )
		limitFields = Self.QueryToBSON( limitFields )
		
		Local size% = Len( query ) + Len( limitFields ) + Len( fullCollectionName ) + 28
		
		WriteInt( Self.tcpStream , size )
		WriteInt( Self.tcpStream , requestID )
		WriteInt( Self.tcpStream , 0 )
		WriteInt( Self.tcpStream , OP_QUERY )
		WriteInt( Self.tcpStream , 0 )'flags %100000 for full burst
		WriteString( Self.tcpStream , fullCollectionName ) 
		WriteInt( Self.tcpStream , 0 ) 'offset for pagination
		WriteInt( Self.tcpStream , maxResults ) 'limit
		WriteString( Self.tcpStream , query )
		WriteString( Self.tcpStream , limitFields )
		FlushStream( Self.tcpStream )
		
		Local output:BSONCollection= New BSONCollection
		output.setBSON( returnType , Self.listenFor( requestID ) )
		Return output
		
	End Method
	
	Method listenFor:TBankStream( requestID% )

		Local output:TBankStream

		Local gotHeader% = False
		Local gotData% = False
		
		Local messageLength% = 0

		Repeat	
				
			If Self.tcpSocket
			
				If Self.tcpSocket.ReadAvail() > 3
					
					If Not gotHeader
						gotHeader = True
						
						messageLength = ReadInt( Self.tcpStream ) - 16
						Local thisRequestID% = ReadInt( Self.tcpStream )
						Local thisResponseTo% = ReadInt( Self.tcpStream )
						Local opCode% = ReadInt( Self.tcpStream )
						If( thisResponseTo = requestID ) DebugLog( "This is my data, it is " + messageLength + " bytes long" )		
						
					endIf

					If gotHeader And Self.tcpSocket.readAvail() >= messageLength
						gotData = true
						Local bank:TBank = CreateBank( messageLength )
						output = CreateBankStream( bank )
						WriteString( output , ReadString ( Self.tcpStream , messageLength ) )
					EndIf

				EndIf
	
			EndIf
	
		Until gotData
		
		Return output
	
	End Method
	
	Method QuerytoBSON$( query$ )

		Local bson$ = ""
		
		If( Len( query ) = 0 )
		
			bson = Self.intString( 5 ) + Chr(0)
		
		else
		
			Local qPart$[2]
			Local equal% = Instr( query , "=" )
			qPart[0] = Left( query , equal - 1 ) + cstring
			qPart[1] = Right( query , Len( query ) - equal ) + cstring
			
			Local vLength% = Len( qPart[1] )
			bson = Chr( 2 ) + qPart[0] + Self.intString( vLength ) + qPart[1] + Chr(0)
			
			Local size% = Len( bson ) + 4
			bson = Self.intString( size ) + bson
		
		endIf
		
		Return bson
		
	End method
	Method intString$( val:Int )
		
		Local output$ = ""
		PokeInt( Self.buffer , 0 , val )
		
		For Local i% = 0 To 3
			output :+ chr( PeekByte( Self.buffer , i ) )
		next
		
		Return output
		
	End method
	
End Type

Type BSONCollection

	Field documents:TList = CreateList()

	Method setBSON( returnType$ , data:TBankStream )
		
		SeekStream( data , 0 )
		Local responseFlags% = ReadInt( data )
		Local cursorID:Long = ReadLong( data )
		Local startingFrom% = ReadInt( data )
		Local numberReturned% = ReadInt( data )
		
		If( numberReturned = 1 )

			Local obj:Object = Self.getDocument( returnType , data )
			ListAddLast( Self.documents , obj )

		Else

			For Local rows% = 1 To numberReturned
			
				Local obj:Object = Self.getDocument( returnType , data )
				ListAddLast( Self.documents , obj )
				
				If rows < numberReturned
					Local docSeparator%
					docSeparator =ReadShort( data )
				EndIf
			
				DebugLog( "---" )
			
			Next
			
		EndIf
		
		CloseStream( data )
	
	End Method
	Method getDocument:Object( returnType$ , data:TBankStream  )

		Local docsize% = ReadInt( data )
		DebugLog( "DOCSIZE " + docsize )
		
		Local objectType:TTypeId = TTypeId.ForName( returnType )
		Local obj:Object = objectType.newObject()
		
		Local cmd%

		'littleendianStream( data )

		Repeat
			cmd% = ReadByte( data )
			DebugLog( "CMD " + cmd )
		
			Select cmd
				Case 1
					Local name$ = ReadLine( data )
					Local value! = ReadDouble( data )
					
					Local fld:TField = objectType.findField( name )
					fld.setDouble( obj , value )
					
				Case 2
					Local name$ = ReadLine( data )
					Local length% = ReadInt( data )
					Local value$ = Left( ReadString( data , length ) , length - 1 )

					Local fld:TField = objectType.findField( name )
					fld.setString( obj , value )

				Case 4
					Local name$ = ReadLine( data )

					Local fld:TField = objectType.findField( name )
					Local arrayObject: Object= fld.get( obj )
					Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )

					Local arrayParam$[1]
					Local pointerPosition% = StreamPos( data ) + ReadInt( data )
					Local arraySize% = 0
					
					Local arrCmd%
					Repeat
						arrCmd = ReadByte( data )
						Select arrCmd
							Case 1
								Local index% = Int( ReadLine( data ).toInt() )
								Local value! = ReadDouble( data )

								arrayParam[ 0 ] = String( Int( value ) )
								arrayId.setArrayElement( arrayObject , index , arrayParam[0] )

							Default
						End Select
					Until arrCmd = 0

					Local endArrayDocument% = ReadByte( data )	
					SeekStream( data , pointerPosition )				
				Case 7
					Local name$ = ReadLine( data )
					
					Local fld:TField = objectType.findField( name )
					Local value$ = ReadString( data , 12 )
				
					fld.setString( obj ,  value )
				Case 0
					'document end
				Default
					DebugLog( "Unprocessed " + cmd )
					
			End Select
			
		Until cmd = 0 Or Eof( data )
		
		SeekStream( data , StreamPos( data ) - 2 )
	
		'bigEndianStream( data )

		Return obj
		
	End Method
	
End Type