Mongo Database Driver for Blitz Max

Banshee(Posted 2014) [#1]
For those that do not know the Mongo database system is a very fast BSON/JSON based NO-SQL form of database. You can find out more about it here:

I've been working on a Mongo Driver for BlitzMax which has now reached the point that I can retrieve a document from a Mongo collection.

So yes this is early stages and is not feature complete and some functionality will definitely change.

I am not intending this as a full Mongo driver, I have very specific goals which I need to solve for my own purposes - but I see no reason why this code cannot be shared and made use of by other BlitzMax users who use Mongo databases.

So whilst I do not intend to support shards or clusters, I will be supporting these things:

* To select a single document from a collection based upon multiple search criterea.
* To select a collection of documents from a database based upon multiple search criterea.
* To insert new documents into a collection.
* To update documents within a collection based upon a specific _id.

I am unlikely to need cursors so probably won't be including them, which means if you want to do paginated output with large datasets you'll have to modify my work.


The basic principle of the connector is as follows:

There is a Mongo class which connects automatically to the default localhost and port. You can change the host and port in the constructor when you connect to the server.

Local Mongo:TMongo = New TMongo
Mongo.connect( <server$ optional> , <port optional> )

It will return true if connected, and false if there was an error.

To select a database, call the use method

Mongo.use( "myDatabase" )

And here is where the concepts are a little different from most other things. Instead of calling a query and getting an array of rows back - Mongo has a flexible internal format - so what I've done is I've allowed a custom type to be passed in to the query parameter, and your results will be returned casted to that type.

You get back a collection, which I've called a BSONCollection as it processes all of the Mongo internal BSON code.

Clearly I need to explain this better, so...

Type TMyOwnObjectFormat
    field myVar , myOtherVar$
end Type

local playerCollection.BSONCollection
playerCollection = Mongo.get( "TMyOwnObjectFormat" , <some other params I've not explained yet> )
for local row:TMyOwnObjectFormat = EachIn playerCollection.documents
    'each row I get back is in my own data format! Cool :D

So there's a few more parameters we need to do a successful query, as well as the reflection class we already specified we next need to give the collection name to fetch from, this is a simple string. And then we need to specify the search parameter.

Now the search parameter is definitely going to change because I want to provide multiple parameters, but currently it's a simple string holding a name and value pair separated by a equals sign.


so if I wanted to search for a variable called MongoIsGreat with a value of 5, I would give "MongoIsGreat=5"

At the moment the connector is only returning a single document, in fact this is the only functionality I have completed so far as I only started last night.

So here goes - this is the Mongo Driver
Const	OP_REPLY%		= 1	'Reply To a client request. responseTo is set
Const	OP_MSG%		= 1000	'generic msg command followed by a String
Const	OP_UPDATE%		= 2001	'update document
Const	OP_INSERT%		= 2002	'insert New document
Const	RESERVED%		= 2003	'formerly used For OP_GET_BY_OID
Const	OP_QUERY%		= 2004	'query a collection
Const	OP_GET_MORE%	= 2005	'Get more data from a query. See Cursors
Const	OP_DELETE%		= 2006	'Delete documents
Const	OP_KILL_CURSORS%	= 2007	'Tell database client is done with a cursor
Const cstring$			= Chr(0)

Type TMongo

	Field tcpSocket:TSocket
	Field tcpStream:TStream
	Field database$
	Field buffer:TBank = CreateBank( 4 )
	Method setBufferSize( bufferSize% )
		Self.buffer.resize( bufferSize )
	End Method
	Method connect%( mongoHost$ = "localhost" , mongoPort% = 27017 )
		DebugLog( "Connecting to " + mongoHost + ":" + mongoPort )
		Self.tcpSocket:TSocket = CreateTCPSocket()
		If( ConnectSocket( tcpSocket , HostIp( mongoHost ) , mongoPort ) )
			DebugLog( "Connection" )
			Self.tcpStream = CreateSocketStream( Self.tcpSocket , False )
			Return True
			DebugLog( "Could not resolve host" )
			Return False
	End Method
	Method use( databaseName$ )
		Self.database = databaseName
	End Method
	Method get:BSONCollection( returnType$ , collection$ , query$ = "" , limitFields$ = "" , maxResults% = 1 )
		Local requestID% = Rand( 0 , 2^32 )
		Local fullCollectionName$ = Self.database + "." + collection + cstring
		query = Self.QueryToBSON( Replace( query , "'" , Chr(34) ) )
		limitFields = Self.QueryToBSON( limitFields )
		Local size% = Len( query ) + Len( limitFields ) + Len( fullCollectionName ) + 28
		WriteInt( Self.tcpStream , size )
		WriteInt( Self.tcpStream , requestID )
		WriteInt( Self.tcpStream , 0 )
		WriteInt( Self.tcpStream , OP_QUERY )
		WriteInt( Self.tcpStream , 0 )'flags %100000 for full burst
		WriteString( Self.tcpStream , fullCollectionName ) 
		WriteInt( Self.tcpStream , 0 ) 'offset for pagination
		WriteInt( Self.tcpStream , maxResults ) 'limit
		WriteString( Self.tcpStream , query )
		WriteString( Self.tcpStream , limitFields )
		FlushStream( Self.tcpStream )
		Local output:BSONCollection= New BSONCollection
		output.setBSON( returnType , Self.listenFor( requestID ) )
		Return output
	End Method
	Method listenFor:TBankStream( requestID% )

		Local output:TBankStream

		Local gotHeader% = False
		Local gotData% = False
		Local messageLength% = 0

			If Self.tcpSocket
				If Self.tcpSocket.ReadAvail() > 3
					If Not gotHeader
						gotHeader = True
						messageLength = ReadInt( Self.tcpStream ) - 16
						Local thisRequestID% = ReadInt( Self.tcpStream )
						Local thisResponseTo% = ReadInt( Self.tcpStream )
						Local opCode% = ReadInt( Self.tcpStream )
						If( thisResponseTo = requestID ) DebugLog( "This is my data, it is " + messageLength + " bytes long" )		

					If gotHeader And Self.tcpSocket.readAvail() >= messageLength
						gotData = true
						Local bank:TBank = CreateBank( messageLength )
						output = CreateBankStream( bank )
						WriteString( output , ReadString ( Self.tcpStream , messageLength ) )

		Until gotData
		Return output
	End Method
	Method QuerytoBSON$( query$ )

		Local bson$ = ""
		If( Len( query ) = 0 )
			bson = Self.intString( 5 ) + Chr(0)
			Local qPart$[2]
			Local equal% = Instr( query , "=" )
			qPart[0] = Left( query , equal - 1 ) + cstring
			qPart[1] = Right( query , Len( query ) - equal ) + cstring
			Local vLength% = Len( qPart[1] )
			bson = Chr( 2 ) + qPart[0] + Self.intString( vLength ) + qPart[1] + Chr(0)
			Local size% = Len( bson ) + 4
			bson = Self.intString( size ) + bson
		Return bson
	End method
	Method intString$( val:Int )
		Local output$ = ""
		PokeInt( Self.buffer , 0 , val )
		For Local i% = 0 To 3
			output :+ chr( PeekByte( Self.buffer , i ) )
		Return output
	End method
End Type

Type BSONCollection

	Field documents:TList = CreateList()

	Method setBSON( returnType$ , data:TBankStream )
		SeekStream( data , 0 )
		Local responseFlags% = ReadInt( data )
		Local cursorID:Long = ReadLong( data )
		Local startingFrom% = ReadInt( data )
		Local numberReturned% = ReadInt( data )
		If( numberReturned = 1 )

			Local obj:Object = Self.getDocument( returnType , data )
			ListAddLast( Self.documents , obj )

			DebugLog( "Multiple return objects" )
		CloseStream( data )
	End method
	Method getDocument:Object( returnType$ , data:TBankStream  )
		Local objectType:TTypeId = TTypeId.ForName( returnType )
		Local obj:Object = objectType.newObject()
		Local cmd%
		Local docsize% = ReadInt( data )

		'littleendianStream( data )

			cmd% = ReadByte( data )	
			Select cmd
				Case 1
					Local name$ = ReadLine( data )
					Local value! = ReadDouble( data )
					DebugLog( name + " = " + value )

					Local fld:TField = objectType.findField( name )
					fld.setDouble( obj , value )
				Case 2
					Local name$ = ReadLine( data )
					Local length% = ReadInt( data )
					Local value$ = Left( ReadString( data , length ) , length - 1 )

					DebugLog( name + " = " + value )

					Local fld:TField = objectType.findField( name )
					fld.setString( obj , value )
				Case 4
					Local name$ = ReadLine( data )

					Local fld:TField = objectType.findField( name )
					Local arrayObject: Object= fld.get( obj )
					Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )

					Local arrayParam$[1]
					Local arrayByteSize% = ReadInt( data )
					Local arraySize% = 0
					Local arrCmd%
						arrCmd = ReadByte( data )
						Select arrCmd
							Case 1
								Local index% = Int( ReadLine( data ).toInt() )
								Local value! = ReadDouble( data )

								arrayParam[ 0 ] = String( Int( value ) )
								DebugLog( name +"[ " + index + " ] = " + arrayParam[0] )
								arrayId.setArrayElement( arrayObject , index , arrayParam[0] )

						End select
					Until arrCmd = 0

					DebugLog( "Done Array " + name )	
					Local endArrayDocument% = ReadByte( data )					
				Case 7
					Local name$ = ReadLine( data )
					Local fld:TField = objectType.findField( name )
					fld.setString( obj , ReadString( data , 12 ) )
					DebugLog( "Unprocessed command " + cmd )
			End Select
		Until cmd = 0 Or Eof( data )
		'bigEndianStream( data )

		Return obj
	End Method
End Type

To test it I use my JSON serialisation super class and a small sample object which I have data for, I will include them here only to demonstrate how I am using this stuff and not as part of the connector

	Method getJSON$()
		Local json$
		Local id:TTypeId = TTypeId.ForObject( Self )
		For Local fld:TField = EachIn id.EnumFields()
			Local fieldName$ = Chr( 34 ) + fld.Name() + Chr( 34 ) + ":"
			Local elementName$ = fld.TypeId().Name()
			If( Right( elementName , 2 ) = "[]" )
				json :+ fieldName + "["
				Local arrayObject: Object= fld.get( Self )
				Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )
				Local length% = fld.typeId().ArrayLength( fld.get( Self ) , 0 )
				For Local i% = 0 To length - 1
					Local arrayElement:Object = arrayId.getArrayElement( arrayObject , i )
					json :+ arrayElement.toString() + ","
				json = Left( json , Len( json ) - 1 ) + "],"
				Select elementName
					Case "String"
						json :+ fieldName + Chr(34) + fld.getString$( Self ) + Chr(34) + ","
					Case "Int"
						json :+ fieldName + fld.getInt( Self ) + ","
						DebugLog( fld.Name() +" is of unprocessed type " + fld.typeId().Name() )
				End Select
		Return "{" + Left( json , Len( json ) - 1 ) + "}"

	End Method
	Method setJSON( json$ )
		json = Left( Right( json , Len( json ) - 1 ) , Len( json ) - 1 ) + ","
		Local comma% = Instr( json , "," )
		Local square% = Instr( json , "[" )
		If square < comma Then comma = Instr( json , "]" , square ) + 1
		Local id:TTypeId = TTypeId.ForObject( Self )
		While( comma > 0 )

			Local this$ = Left( json , comma )
			Local colon% = Instr( this , ":" )
			Local cmd$ = Mid( this , 2 , colon - 3 )
			Local param$ = Mid( this , colon + 1 , Len( this ) - ( colon + 1 ) )
			Local paramInt% = 0
			Local fld:TField = id.findField( cmd )
			If( Left( param, 1 ) = "[" And Right( param, 1 ) = "]" )
				Local arrayObject: Object= fld.get( Self )
				Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )
				Local length% = fld.typeId().ArrayLength( fld.get( Self ) , 0 )
				Local paramArray$[1]
				Local aparam$ = Mid( param , 2 , Len( param ) - 2 )
				For Local i% = 0 To length - 1
					Local acomma% = Instr( aparam , "," )
					paramArray[0] = Left( aparam , acomma - 1 )
					aparam = Right( aparam , Len( aparam ) - acomma )
					arrayId.setArrayElement( arrayObject , i , paramArray[0] )

				If( Left( param , 1 ) = Chr(34) And Right( param , 1 ) = Chr( 34 ) )
					param = Mid( param , 2 , Len( param ) - 2 )
					If fld Then fld.setString( Self , param )
					paramInt = Int( param )
					If fld Then fld.setInt( Self , paramInt )
			If Not fld And Len( cmd ) > 1 Then DebugLog( "Could not find field " + cmd )
			json = Right( json , Len( json ) - comma )
			comma = Instr( json , "," ) 
			square = Instr( json , "[" )
			If square < comma Then comma = Instr( json , "]" , square ) + 1

	End Method
End Type
Type TPlayer Extends JSON
	Field _id$
	Field prefix$
	Field name$
	Field surname$
	Field suffix$
	Field race $, class$
	Field gender% , level% , face% , hair%
	Field strength% , strengthPercentile% , dexterity% , constitution% , inteligence% , wisdom% , charisma%
	Field thaco% , ac% , hp% , weaponDice%

	Field equipment%[20]

End Type

And here is some code that shows how I pull it all together to perform a Mongo fetch

Local Mongo:TMongo = New TMongo
If( Mongo.connect() )

	Local playerCollection:BSONCollection
	Mongo.use( "osric" )
	playerCollection = Mongo.get( "Tplayer" , "players" , "surname=Rose" )
	For Local player:TPlayer = EachIn playerCollection.documents
		DebugLog( player.getJSON() )

Banshee(Posted 2014) [#2]
Fixed an issue with arrays and now supports returning multiple results, although the default is 1.

Also a note i forgot to mention earlier: The reflection class that you pass to Mongo.get must have a Field _id$ property. This will hold the Mongo document index.

Type TMongo

	Field tcpSocket:TSocket
	Field tcpStream:TStream
	Field database$
	Field buffer:TBank = CreateBank( 4 )
	Method setBufferSize( bufferSize% )
		Self.buffer.resize( bufferSize )
	End Method
	Method connect%( mongoHost$ = "localhost" , mongoPort% = 27017 )
		DebugLog( "Connecting to " + mongoHost + ":" + mongoPort )
		Self.tcpSocket:TSocket = CreateTCPSocket()
		If( ConnectSocket( tcpSocket , HostIp( mongoHost ) , mongoPort ) )
			DebugLog( "Connection" )
			Self.tcpStream = CreateSocketStream( Self.tcpSocket , False )
			Return True
			DebugLog( "Could not resolve host" )
			Return False
	End Method
	Method use( databaseName$ )
		Self.database = databaseName
	End Method
	Method get:BSONCollection( returnType$ , collection$ , query$ = "" , limitFields$ = "" , maxResults% = 1 )
		Local requestID% = Rand( 0 , 2^32 )
		Local fullCollectionName$ = Self.database + "." + collection + cstring
		query = Self.QueryToBSON( Replace( query , "'" , Chr(34) ) )
		limitFields = Self.QueryToBSON( limitFields )
		Local size% = Len( query ) + Len( limitFields ) + Len( fullCollectionName ) + 28
		WriteInt( Self.tcpStream , size )
		WriteInt( Self.tcpStream , requestID )
		WriteInt( Self.tcpStream , 0 )
		WriteInt( Self.tcpStream , OP_QUERY )
		WriteInt( Self.tcpStream , 0 )'flags %100000 for full burst
		WriteString( Self.tcpStream , fullCollectionName ) 
		WriteInt( Self.tcpStream , 0 ) 'offset for pagination
		WriteInt( Self.tcpStream , maxResults ) 'limit
		WriteString( Self.tcpStream , query )
		WriteString( Self.tcpStream , limitFields )
		FlushStream( Self.tcpStream )
		Local output:BSONCollection= New BSONCollection
		output.setBSON( returnType , Self.listenFor( requestID ) )
		Return output
	End Method
	Method listenFor:TBankStream( requestID% )

		Local output:TBankStream

		Local gotHeader% = False
		Local gotData% = False
		Local messageLength% = 0

			If Self.tcpSocket
				If Self.tcpSocket.ReadAvail() > 3
					If Not gotHeader
						gotHeader = True
						messageLength = ReadInt( Self.tcpStream ) - 16
						Local thisRequestID% = ReadInt( Self.tcpStream )
						Local thisResponseTo% = ReadInt( Self.tcpStream )
						Local opCode% = ReadInt( Self.tcpStream )
						If( thisResponseTo = requestID ) DebugLog( "This is my data, it is " + messageLength + " bytes long" )		

					If gotHeader And Self.tcpSocket.readAvail() >= messageLength
						gotData = true
						Local bank:TBank = CreateBank( messageLength )
						output = CreateBankStream( bank )
						WriteString( output , ReadString ( Self.tcpStream , messageLength ) )

		Until gotData
		Return output
	End Method
	Method QuerytoBSON$( query$ )

		Local bson$ = ""
		If( Len( query ) = 0 )
			bson = Self.intString( 5 ) + Chr(0)
			Local qPart$[2]
			Local equal% = Instr( query , "=" )
			qPart[0] = Left( query , equal - 1 ) + cstring
			qPart[1] = Right( query , Len( query ) - equal ) + cstring
			Local vLength% = Len( qPart[1] )
			bson = Chr( 2 ) + qPart[0] + Self.intString( vLength ) + qPart[1] + Chr(0)
			Local size% = Len( bson ) + 4
			bson = Self.intString( size ) + bson
		Return bson
	End method
	Method intString$( val:Int )
		Local output$ = ""
		PokeInt( Self.buffer , 0 , val )
		For Local i% = 0 To 3
			output :+ chr( PeekByte( Self.buffer , i ) )
		Return output
	End method
End Type

Type BSONCollection

	Field documents:TList = CreateList()

	Method setBSON( returnType$ , data:TBankStream )
		SeekStream( data , 0 )
		Local responseFlags% = ReadInt( data )
		Local cursorID:Long = ReadLong( data )
		Local startingFrom% = ReadInt( data )
		Local numberReturned% = ReadInt( data )
		If( numberReturned = 1 )

			Local obj:Object = Self.getDocument( returnType , data )
			ListAddLast( Self.documents , obj )


			For Local rows% = 1 To numberReturned
				Local obj:Object = Self.getDocument( returnType , data )
				ListAddLast( Self.documents , obj )
				If rows < numberReturned
					Local docSeparator%
					docSeparator =ReadShort( data )
				DebugLog( "---" )
		CloseStream( data )
	End Method
	Method getDocument:Object( returnType$ , data:TBankStream  )

		Local docsize% = ReadInt( data )
		DebugLog( "DOCSIZE " + docsize )
		Local objectType:TTypeId = TTypeId.ForName( returnType )
		Local obj:Object = objectType.newObject()
		Local cmd%

		'littleendianStream( data )

			cmd% = ReadByte( data )
			DebugLog( "CMD " + cmd )
			Select cmd
				Case 1
					Local name$ = ReadLine( data )
					Local value! = ReadDouble( data )
					Local fld:TField = objectType.findField( name )
					fld.setDouble( obj , value )
				Case 2
					Local name$ = ReadLine( data )
					Local length% = ReadInt( data )
					Local value$ = Left( ReadString( data , length ) , length - 1 )

					Local fld:TField = objectType.findField( name )
					fld.setString( obj , value )

				Case 4
					Local name$ = ReadLine( data )

					Local fld:TField = objectType.findField( name )
					Local arrayObject: Object= fld.get( obj )
					Local arrayId:TTypeId = TTypeId.ForObject( arrayObject )

					Local arrayParam$[1]
					Local pointerPosition% = StreamPos( data ) + ReadInt( data )
					Local arraySize% = 0
					Local arrCmd%
						arrCmd = ReadByte( data )
						Select arrCmd
							Case 1
								Local index% = Int( ReadLine( data ).toInt() )
								Local value! = ReadDouble( data )

								arrayParam[ 0 ] = String( Int( value ) )
								arrayId.setArrayElement( arrayObject , index , arrayParam[0] )

						End Select
					Until arrCmd = 0

					Local endArrayDocument% = ReadByte( data )	
					SeekStream( data , pointerPosition )				
				Case 7
					Local name$ = ReadLine( data )
					Local fld:TField = objectType.findField( name )
					Local value$ = ReadString( data , 12 )
					fld.setString( obj ,  value )
				Case 0
					'document end
					DebugLog( "Unprocessed " + cmd )
			End Select
		Until cmd = 0 Or Eof( data )
		SeekStream( data , StreamPos( data ) - 2 )
		'bigEndianStream( data )

		Return obj
	End Method
End Type