MessageHandler

Package: MachII.framework
Handles processing of message subscribers from publish commands.

<!--- License: Copyright 2008 GreatBizTools, LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright: GreatBizTools, LLC Author: Peter J. Farrell (peter@mach-ii.com) $Id: MessageHandler.cfc 962 2008-08-12 18:18:04Z peterfarrell $ Created version: 1.6.0 Updated version: 1.6.0 Notes: --->

Method Summary
public MessageHandler init(string messageName, boolean multithreaded, boolean waitForThreads, numeric timeout, ThreadingAdapter threadingAdapter)

Initializes the handler.

public void addMessageSubscriber([MessageSubscriber messageSubscriber])

Registers a subscriber to this message.

public Log getLog()

Gets the log.

public string getMessageName()
public struct getMessageSubscribers()

Gets all message subscribers.

public boolean getMultithreaded()
public array getSubscriberNames()

Gets an array of message subscriber invoker names.

private ThreadingAdapter getThreadingAdapter()

Gets a threading adapter.

public numeric getTimeout()
public boolean getWaitForThreads()
public boolean handleMessage(Event event, EventContext eventContext)

Handles the message.

public void setLog(LogFactory logFactory)

Uses the log factory to create a log.

private void setMessageName(string messageName)
private void setMultithreaded(boolean multithreaded)
private void setThreadingAdapter(ThreadingAdapter threadingAdapter)

Sets a threading adapter.

private void setTimeout(numeric timeout)
private void setWaitForThreads(boolean waitForThreads)
Method Detail
addMessageSubscriber

public void addMessageSubscriber( [MessageSubscriber messageSubscriber] )

Registers a subscriber to this message.

Parameters:
[MessageSubscriber messageSubscriber]

Code:

	<cffunction name="addMessageSubscriber" access="public" returntype="void" output="false"
		hint="Registers a subscriber to this message.">
		<cfargument name="messageSubscriber" type="MachII.framework.MessageSubscriber" />
		
		<cfset var key = arguments.messageSubscriber.getListenerName() & "_" & arguments.messageSubscriber.getMethod() />
		
		<cfset variables.messageSubscribers[key] = arguments.messageSubscriber />
	</cffunction> 

getLog

public Log getLog( )

Gets the log.

Parameters:

Code:

	<cffunction name="getLog" access="public" returntype="MachII.logging.Log" output="false"
		hint="Gets the log.">
		<cfreturn variables.log />
	</cffunction> 

getMessageName

public string getMessageName( )

Parameters:

Code:

	<cffunction name="getMessageName" access="public" returntype="string" output="false">
		<cfreturn variables.messageName />
	</cffunction> 

getMessageSubscribers

public struct getMessageSubscribers( )

Gets all message subscribers.

Parameters:

Code:

	<cffunction name="getMessageSubscribers" access="public" returntype="struct" output="false"
		hint="Gets all message subscribers.">
		<cfreturn variables.messageSubscribers />
	</cffunction> 

getMultithreaded

public boolean getMultithreaded( )

Parameters:

Code:

	<cffunction name="getMultithreaded" access="public" returntype="boolean" output="false">
		<cfreturn variables.multithreaded />
	</cffunction> 

getSubscriberNames

public array getSubscriberNames( )

Gets an array of message subscriber invoker names.

Parameters:

Code:

	<cffunction name="getSubscriberNames" access="public" returntype="array" output="false"
		hint="Gets an array of message subscriber invoker names.">
		<cfreturn StructKeyArray(variables.messageSubscribers) />
	</cffunction> 

getThreadingAdapter

private ThreadingAdapter getThreadingAdapter( )

Gets a threading adapter.

Parameters:

Code:

	<cffunction name="getThreadingAdapter" access="private" returntype="MachII.util.threading.ThreadingAdapter" output="false"
		hint="Gets a threading adapter.">
		<cfreturn variables.threadingAdapter />
	</cffunction> 

getTimeout

public numeric getTimeout( )

Parameters:

Code:

	<cffunction name="getTimeout" access="public" returntype="numeric" output="false">
		<cfreturn variables.timeout />
	</cffunction> 

getWaitForThreads

public boolean getWaitForThreads( )

Parameters:

Code:

	<cffunction name="getWaitForThreads" access="public" returntype="boolean" output="false">
		<cfreturn variables.waitForThreads />
	</cffunction> 

handleMessage

public boolean handleMessage( Event event, EventContext eventContext )

Handles the message.

Parameters:
Event event
EventContext eventContext

Code:

	<cffunction name="handleMessage" access="public" returntype="boolean" output="false"
		hint="Handles the message.">
		<cfargument name="event" type="MachII.framework.Event" required="true" />
		<cfargument name="eventContext" type="MachII.framework.EventContext" required="true" />
		
		<cfset var subscribers = getMessageSubscribers() />
		<cfset var threadingAdapter = getThreadingAdapter() />
		<cfset var threadIds = StructNew() />
		<cfset var publishThreadIdsInEvent = arguments.event.getArg("_publishThreadIds", StructNew()) />
		<cfset var parameters = StructNew() />
		<cfset var results = StructNew() />
		<cfset var exception = "" />
		<cfset var continue = true />
		<cfset var log = getLog() />
		<cfset var key = "" />
		
		
		<cfif getMultithreaded() AND threadingAdapter.allowThreading()>
		
			<cfif log.isDebugEnabled()>
				<cfset log.debug("Received published message named '#getMessageName()#' (running in multi-threaded).") />
			</cfif>
			
			
			<cfset parameters.event = arguments.event />
			
			
			<cfloop collection="#subscribers#" item="key">
				<cfset threadIds[threadingAdapter.run(subscribers[key], "invokeListener", parameters)] = key />
			</cfloop>
			
			
			<cfif getWaitForThreads()>
				<cfif log.isDebugEnabled()>
					<cfset log.debug("Joining threads for message named '#getMessageName()#'.") />
				</cfif>

				<cfset results = threadingAdapter.join(threadIds, getTimeout()) />
				
				
				<cfif ArrayLen(results.errors)>
					<cfset continue = false />
					
					<cfif log.isErrorEnabled()>
						<cfset log.error("#results[results.errors[1]].error.message#", results[results.errors[1]].error) />
					</cfif>					
					<cfset exception = arguments.eventContext.getRequestHandler().wrapException(results[results.errors[1]].error) />
					<cfset arguments.eventContext.handleException(exception, true) />
				</cfif>
			
			<cfelse>
				<cfif log.isTraceEnabled()>
					<cfset log.trace("Not waiting to join message threads.") />
				</cfif>
				<cfset StructAppend(publishThreadIdsInEvent, threadIds, "true") />
				<cfset arguments.event.setArg("_publishThreadIds", publishThreadIdsInEvent) />
			</cfif>
		
		<cfelse>
			<cfif log.isDebugEnabled()>
				<cfset log.debug("Received published message named '#getMessageName()#' (running in serial).") />
			</cfif>

			<cfloop collection="#subscribers#" item="key">
				<cfset subscribers[key].invokeListener(arguments.event) />
			</cfloop>
		</cfif>
		
		<cfreturn continue />
	</cffunction> 

init

public MessageHandler init( string messageName, boolean multithreaded, boolean waitForThreads, numeric timeout, ThreadingAdapter threadingAdapter )

Initializes the handler.

Parameters:
string messageName
boolean multithreaded
boolean waitForThreads
numeric timeout
ThreadingAdapter threadingAdapter

Code:

 	<cffunction name="init" access="public" returntype="MessageHandler" output="false"
		hint="Initializes the handler.">
		<cfargument name="messageName" type="string" required="true" />
		<cfargument name="multithreaded" type="boolean" required="true" />
		<cfargument name="waitForThreads" type="boolean" required="true" />
		<cfargument name="timeout" type="numeric" required="true" />
		<cfargument name="threadingAdapter" type="MachII.util.threading.ThreadingAdapter" required="true" />

		
		<cfset setMessageName(arguments.messageName) />
		<cfset setMultithreaded(arguments.multithreaded) />
		<cfset setWaitForThreads(arguments.waitForThreads) />
		<cfset setTimeout(arguments.timeout) />
		<cfset setThreadingAdapter(arguments.threadingAdapter) />

		<cfreturn this />
 	</cffunction> 

setLog

public void setLog( LogFactory logFactory )

Uses the log factory to create a log.

Parameters:
LogFactory logFactory

Code:

	<cffunction name="setLog" access="public" returntype="void" output="false"
		hint="Uses the log factory to create a log.">
		<cfargument name="logFactory" type="MachII.logging.LogFactory" required="true" />
		<cfset variables.log = arguments.logFactory.getLog(getMetadata(this).name) />
	</cffunction> 

setMessageName

private void setMessageName( string messageName )

Parameters:
string messageName

Code:

	<cffunction name="setMessageName" access="private" returntype="void" output="false">
		<cfargument name="messageName" type="string" required="true" />
		<cfset variables.messageName = arguments.messageName />
	</cffunction> 

setMultithreaded

private void setMultithreaded( boolean multithreaded )

Parameters:
boolean multithreaded

Code:

	<cffunction name="setMultithreaded" access="private" returntype="void" output="false">
		<cfargument name="multithreaded" type="boolean" required="true" />
		<cfset variables.multithreaded = arguments.multithreaded />
	</cffunction> 

setThreadingAdapter

private void setThreadingAdapter( ThreadingAdapter threadingAdapter )

Sets a threading adapter.

Parameters:
ThreadingAdapter threadingAdapter

Code:

	<cffunction name="setThreadingAdapter" access="private" returntype="void" output="false"
		hint="Sets a threading adapter.">
		<cfargument name="threadingAdapter" type="MachII.util.threading.ThreadingAdapter" required="true" />
		<cfset variables.threadingAdapter = arguments.threadingAdapter />
	</cffunction> 

setTimeout

private void setTimeout( numeric timeout )

Parameters:
numeric timeout

Code:

	<cffunction name="setTimeout" access="private" returntype="void" output="false">
		<cfargument name="timeout" type="numeric" required="true" />
		<cfset variables.timeout = arguments.timeout />
	</cffunction> 

setWaitForThreads

private void setWaitForThreads( boolean waitForThreads )

Parameters:
boolean waitForThreads

Code:

	<cffunction name="setWaitForThreads" access="private" returntype="void" output="false">
		<cfargument name="waitForThreads" type="boolean" required="true" />
		<cfset variables.waitForThreads = arguments.waitForThreads />
	</cffunction>