| Package: MachII.framework |
| Handles processing of message subscribers from publish commands. |
<!--- License: Copyright 2008 GreatBizTools, LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright: GreatBizTools, LLC Author: Peter J. Farrell (peter@mach-ii.com) $Id: MessageHandler.cfc 962 2008-08-12 18:18:04Z peterfarrell $ Created version: 1.6.0 Updated version: 1.6.0 Notes: ---> |
| Method Summary | |
|---|---|
| public MessageHandler |
init(string messageName, boolean multithreaded, boolean waitForThreads, numeric timeout, ThreadingAdapter threadingAdapter)
Initializes the handler. |
| public void |
addMessageSubscriber([MessageSubscriber messageSubscriber])
Registers a subscriber to this message. |
| public Log |
getLog()
Gets the log. |
| public string | getMessageName() |
| public struct |
getMessageSubscribers()
Gets all message subscribers. |
| public boolean | getMultithreaded() |
| public array |
getSubscriberNames()
Gets an array of message subscriber invoker names. |
| private ThreadingAdapter |
getThreadingAdapter()
Gets a threading adapter. |
| public numeric | getTimeout() |
| public boolean | getWaitForThreads() |
| public boolean |
handleMessage(Event event, EventContext eventContext)
Handles the message. |
| public void |
setLog(LogFactory logFactory)
Uses the log factory to create a log. |
| private void | setMessageName(string messageName) |
| private void | setMultithreaded(boolean multithreaded) |
| private void |
setThreadingAdapter(ThreadingAdapter threadingAdapter)
Sets a threading adapter. |
| private void | setTimeout(numeric timeout) |
| private void | setWaitForThreads(boolean waitForThreads) |
| Method Detail |
|---|
| addMessageSubscriber |
|---|
public void addMessageSubscriber( [MessageSubscriber messageSubscriber] )
Registers a subscriber to this message.
Parameters:
| [MessageSubscriber messageSubscriber] |
Code:
<cffunction name="addMessageSubscriber" access="public" returntype="void" output="false" hint="Registers a subscriber to this message."> <cfargument name="messageSubscriber" type="MachII.framework.MessageSubscriber" /> <cfset var key = arguments.messageSubscriber.getListenerName() & "_" & arguments.messageSubscriber.getMethod() /> <cfset variables.messageSubscribers[key] = arguments.messageSubscriber /> </cffunction>
| getLog |
|---|
public Log getLog( )
Gets the log.
Parameters:
Code:
<cffunction name="getLog" access="public" returntype="MachII.logging.Log" output="false" hint="Gets the log."> <cfreturn variables.log /> </cffunction>
| getMessageName |
|---|
public string getMessageName( )
Parameters:
Code:
<cffunction name="getMessageName" access="public" returntype="string" output="false"> <cfreturn variables.messageName /> </cffunction>
| getMessageSubscribers |
|---|
public struct getMessageSubscribers( )
Gets all message subscribers.
Parameters:
Code:
<cffunction name="getMessageSubscribers" access="public" returntype="struct" output="false" hint="Gets all message subscribers."> <cfreturn variables.messageSubscribers /> </cffunction>
| getMultithreaded |
|---|
public boolean getMultithreaded( )
Parameters:
Code:
<cffunction name="getMultithreaded" access="public" returntype="boolean" output="false"> <cfreturn variables.multithreaded /> </cffunction>
| getSubscriberNames |
|---|
public array getSubscriberNames( )
Gets an array of message subscriber invoker names.
Parameters:
Code:
<cffunction name="getSubscriberNames" access="public" returntype="array" output="false" hint="Gets an array of message subscriber invoker names."> <cfreturn StructKeyArray(variables.messageSubscribers) /> </cffunction>
| getThreadingAdapter |
|---|
private ThreadingAdapter getThreadingAdapter( )
Gets a threading adapter.
Parameters:
Code:
<cffunction name="getThreadingAdapter" access="private" returntype="MachII.util.threading.ThreadingAdapter" output="false" hint="Gets a threading adapter."> <cfreturn variables.threadingAdapter /> </cffunction>
| getTimeout |
|---|
public numeric getTimeout( )
Parameters:
Code:
<cffunction name="getTimeout" access="public" returntype="numeric" output="false"> <cfreturn variables.timeout /> </cffunction>
| getWaitForThreads |
|---|
public boolean getWaitForThreads( )
Parameters:
Code:
<cffunction name="getWaitForThreads" access="public" returntype="boolean" output="false"> <cfreturn variables.waitForThreads /> </cffunction>
| handleMessage |
|---|
public boolean handleMessage( Event event, EventContext eventContext )
Handles the message.
Parameters:
| Event event |
| EventContext eventContext |
Code:
<cffunction name="handleMessage" access="public" returntype="boolean" output="false"
hint="Handles the message.">
<cfargument name="event" type="MachII.framework.Event" required="true" />
<cfargument name="eventContext" type="MachII.framework.EventContext" required="true" />
<cfset var subscribers = getMessageSubscribers() />
<cfset var threadingAdapter = getThreadingAdapter() />
<cfset var threadIds = StructNew() />
<cfset var publishThreadIdsInEvent = arguments.event.getArg("_publishThreadIds", StructNew()) />
<cfset var parameters = StructNew() />
<cfset var results = StructNew() />
<cfset var exception = "" />
<cfset var continue = true />
<cfset var log = getLog() />
<cfset var key = "" />
<cfif getMultithreaded() AND threadingAdapter.allowThreading()>
<cfif log.isDebugEnabled()>
<cfset log.debug("Received published message named '#getMessageName()#' (running in multi-threaded).") />
</cfif>
<cfset parameters.event = arguments.event />
<cfloop collection="#subscribers#" item="key">
<cfset threadIds[threadingAdapter.run(subscribers[key], "invokeListener", parameters)] = key />
</cfloop>
<cfif getWaitForThreads()>
<cfif log.isDebugEnabled()>
<cfset log.debug("Joining threads for message named '#getMessageName()#'.") />
</cfif>
<cfset results = threadingAdapter.join(threadIds, getTimeout()) />
<cfif ArrayLen(results.errors)>
<cfset continue = false />
<cfif log.isErrorEnabled()>
<cfset log.error("#results[results.errors[1]].error.message#", results[results.errors[1]].error) />
</cfif>
<cfset exception = arguments.eventContext.getRequestHandler().wrapException(results[results.errors[1]].error) />
<cfset arguments.eventContext.handleException(exception, true) />
</cfif>
<cfelse>
<cfif log.isTraceEnabled()>
<cfset log.trace("Not waiting to join message threads.") />
</cfif>
<cfset StructAppend(publishThreadIdsInEvent, threadIds, "true") />
<cfset arguments.event.setArg("_publishThreadIds", publishThreadIdsInEvent) />
</cfif>
<cfelse>
<cfif log.isDebugEnabled()>
<cfset log.debug("Received published message named '#getMessageName()#' (running in serial).") />
</cfif>
<cfloop collection="#subscribers#" item="key">
<cfset subscribers[key].invokeListener(arguments.event) />
</cfloop>
</cfif>
<cfreturn continue />
</cffunction>
| init |
|---|
public MessageHandler init( string messageName, boolean multithreaded, boolean waitForThreads, numeric timeout, ThreadingAdapter threadingAdapter )
Initializes the handler.
Parameters:
| string messageName |
| boolean multithreaded |
| boolean waitForThreads |
| numeric timeout |
| ThreadingAdapter threadingAdapter |
Code:
<cffunction name="init" access="public" returntype="MessageHandler" output="false" hint="Initializes the handler."> <cfargument name="messageName" type="string" required="true" /> <cfargument name="multithreaded" type="boolean" required="true" /> <cfargument name="waitForThreads" type="boolean" required="true" /> <cfargument name="timeout" type="numeric" required="true" /> <cfargument name="threadingAdapter" type="MachII.util.threading.ThreadingAdapter" required="true" /> <cfset setMessageName(arguments.messageName) /> <cfset setMultithreaded(arguments.multithreaded) /> <cfset setWaitForThreads(arguments.waitForThreads) /> <cfset setTimeout(arguments.timeout) /> <cfset setThreadingAdapter(arguments.threadingAdapter) /> <cfreturn this /> </cffunction>
| setLog |
|---|
public void setLog( LogFactory logFactory )
Uses the log factory to create a log.
Parameters:
| LogFactory logFactory |
Code:
<cffunction name="setLog" access="public" returntype="void" output="false" hint="Uses the log factory to create a log."> <cfargument name="logFactory" type="MachII.logging.LogFactory" required="true" /> <cfset variables.log = arguments.logFactory.getLog(getMetadata(this).name) /> </cffunction>
| setMessageName |
|---|
private void setMessageName( string messageName )
Parameters:
| string messageName |
Code:
<cffunction name="setMessageName" access="private" returntype="void" output="false"> <cfargument name="messageName" type="string" required="true" /> <cfset variables.messageName = arguments.messageName /> </cffunction>
| setMultithreaded |
|---|
private void setMultithreaded( boolean multithreaded )
Parameters:
| boolean multithreaded |
Code:
<cffunction name="setMultithreaded" access="private" returntype="void" output="false"> <cfargument name="multithreaded" type="boolean" required="true" /> <cfset variables.multithreaded = arguments.multithreaded /> </cffunction>
| setThreadingAdapter |
|---|
private void setThreadingAdapter( ThreadingAdapter threadingAdapter )
Sets a threading adapter.
Parameters:
| ThreadingAdapter threadingAdapter |
Code:
<cffunction name="setThreadingAdapter" access="private" returntype="void" output="false" hint="Sets a threading adapter."> <cfargument name="threadingAdapter" type="MachII.util.threading.ThreadingAdapter" required="true" /> <cfset variables.threadingAdapter = arguments.threadingAdapter /> </cffunction>
| setTimeout |
|---|
private void setTimeout( numeric timeout )
Parameters:
| numeric timeout |
Code:
<cffunction name="setTimeout" access="private" returntype="void" output="false"> <cfargument name="timeout" type="numeric" required="true" /> <cfset variables.timeout = arguments.timeout /> </cffunction>
| setWaitForThreads |
|---|
private void setWaitForThreads( boolean waitForThreads )
Parameters:
| boolean waitForThreads |
Code:
<cffunction name="setWaitForThreads" access="private" returntype="void" output="false"> <cfargument name="waitForThreads" type="boolean" required="true" /> <cfset variables.waitForThreads = arguments.waitForThreads /> </cffunction>