View Javadoc

1   /**
2    * $Id: 67 2006-11-09 05:26:18Z maldito_orco $
3    * $Revision: 67 $
4    * $Date: 2006-11-09 02:26:18 -0300 (Thu, 09 Nov 2006) $
5    *
6    * =========================================================================
7    *
8    * Copyright 2005 Tubo
9    *
10   *  Licensed under the Apache License, Version 2.0 (the "License");
11   *  you may not use this file except in compliance with the License.
12   *  You may obtain a copy of the License at
13   *
14   *
15   *
16   *  Unless required by applicable law or agreed to in writing, software
17   *  distributed under the License is distributed on an "AS IS" BASIS,
18   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19   *  See the License for the specific language governing permissions and
20   *  limitations under the License.
21   */
22  package org.tubo.resource.consumer.serversocket;
24  //log
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  //tubo
28  import org.tubo.resource.consumer.ConsumerManager;
29  import org.tubo.exception.TuboConsumerLoaderException;
30  import org.tubo.exception.TuboResourceException;
31  //net
32  import;
33  import;
34  //io
35  import;
36  //concurrent
37  import;
38  import;
40  /**
41   * This class listen for new ServerSocket connections, when a new connection
42   * arrives, take a thead from the pool and a consumer and instance a
43   * WorkerProcessor for work with the new connection/consumer.
44   *
45   * Created: Sep 7, 2006, 12:51:43 AM
46   * Last Modification Date: $Date: 2006-11-09 02:26:18 -0300 (Thu, 09 Nov 2006) $
47   *
48   * @author maldito_orco (
49   * @version $Revision: 67 $
50   */
51  public class ServerSocketWorker implements Runnable {
52      public static final String RCS_ID = "$Id: 67 2006-11-09 05:26:18Z maldito_orco $";
53      private static Log log = LogFactory.getLog(ServerSocketWorker.class);
55      public static final int STATE_LOADING = 0;
56      public static final int STATE_RUNNING = 1;
57      public static final int STATE_UNLOADING = 2;
58      public static final int STATE_STOPED = 3;
59      public static final int STATE_ERROR = 4;
61      public static final int DEFAULT_MAX_CAPACITY = 1;
63      /** TODO */
64      private ConsumerManager manager = null;
65      /** TODO */
66      private int port = -1;
67      /** TODO */
68      private ServerSocket serverSocket = null;
69      /** TODO */
70      private ExecutorService executor = null;
71      /** TODO */
72      private int state = STATE_STOPED;
73      /** TODO */
74      private TuboConsumerLoaderException lastException = null;
77      /**
78       * TODO
79       * @param manager
80       * @param port
81       * @throws TuboConsumerLoaderException
82       */
83      public ServerSocketWorker(ConsumerManager manager, int port) throws TuboConsumerLoaderException {
84          this.manager = manager;
85          this.port = port;
86          //
87          // create ServerSocket
88          try {
89              serverSocket = new ServerSocket(this.port);
90          } catch (IOException e) {
91              state = STATE_ERROR;
92              lastException = new TuboConsumerLoaderException("Error creating ServerSocket",e);
93              throw lastException;
94          }
95          //
96          // get maxCapacity
97          String smaxCapacity = manager.getProperty("maxCapacity");
98          int maxCapacity;
99          if(smaxCapacity!=null)
100             try {
101                 maxCapacity = Integer.parseInt(smaxCapacity);
102             } catch (NumberFormatException e) {
103                 maxCapacity = DEFAULT_MAX_CAPACITY;
104                 log.error("Malformed maxCapacity property in consumer id="+manager.getId()+". Using default("+DEFAULT_MAX_CAPACITY+")");
105             }
106         else {
107             maxCapacity = DEFAULT_MAX_CAPACITY;
108   "ServerSocketConsumer id="+manager.getId()+" hasn't defined maxCapacity property. Using default("+DEFAULT_MAX_CAPACITY+")");
109         }
110         //
111         // create executor
112         executor = Executors.newFixedThreadPool(maxCapacity);
113     }
115     /**
116      * TODO
117      */
118     public void run() {
119         //
120         // set running state
121         state = STATE_RUNNING;
122         //
123         // worker cycle
124         while (state<=STATE_RUNNING) {
125             //
126             // wait for client
127             Socket socket = null;
128             try {
129                 socket = serverSocket.accept();
130             } catch (IOException e) {
131                 state = STATE_ERROR;
132                 lastException = new TuboConsumerLoaderException("Error accepting socket",e);
133                 break;
134             }
135             //
136             // get a consumer for this socket (client)
137             ServerSocketConsumer consumer = null;
138             try {
139                 consumer = (ServerSocketConsumer)manager.getConsumer();
140             } catch (TuboResourceException e) {
141                 state = STATE_ERROR;
142                 lastException = new TuboConsumerLoaderException("Error getting consumer",e);
143                 break;
144             }
145             //
146             // create a process thread for this socket
147             ServerSocketConsumerWorker consumerWorker = new ServerSocketConsumerWorker(socket,consumer,manager);
148             //
149             // runs process for this socket
150             try {
151                 executor.execute(consumerWorker);
152             } catch (Exception e) {
153                 //FIXME: Add a exception management at this point
154             }
155         }
156     }
159 }