1 /**
2 * $Id: ServerSocketWorker.java 67 2006-11-09 05:26:18Z maldito_orco $
3 * $Revision: 67 $
4 * $Date: 2006-11-09 02:26:18 -0300 (Thu, 09 Nov 2006) $
5 *
6 * =========================================================================
7 *
8 * Copyright 2005 Tubo
9 *
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 *
14 * http://www.apache.org/licenses/LICENSE-2.0
15 *
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 */
22 package org.tubo.resource.consumer.serversocket;
23
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import org.tubo.resource.consumer.ConsumerManager;
29 import org.tubo.exception.TuboConsumerLoaderException;
30 import org.tubo.exception.TuboResourceException;
31
32 import java.net.ServerSocket;
33 import java.net.Socket;
34
35 import java.io.IOException;
36
37 import edu.emory.mathcs.backport.java.util.concurrent.Executors;
38 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
39
40 /**
41 * This class listen for new ServerSocket connections, when a new connection
42 * arrives, take a thead from the pool and a consumer and instance a
43 * WorkerProcessor for work with the new connection/consumer.
44 *
45 * Created: Sep 7, 2006, 12:51:43 AM
46 * Last Modification Date: $Date: 2006-11-09 02:26:18 -0300 (Thu, 09 Nov 2006) $
47 *
48 * @author maldito_orco (maldito_orco@users.sourceforge.net)
49 * @version $Revision: 67 $
50 */
51 public class ServerSocketWorker implements Runnable {
52 public static final String RCS_ID = "$Id: ServerSocketWorker.java 67 2006-11-09 05:26:18Z maldito_orco $";
53 private static Log log = LogFactory.getLog(ServerSocketWorker.class);
54
55 public static final int STATE_LOADING = 0;
56 public static final int STATE_RUNNING = 1;
57 public static final int STATE_UNLOADING = 2;
58 public static final int STATE_STOPED = 3;
59 public static final int STATE_ERROR = 4;
60
61 public static final int DEFAULT_MAX_CAPACITY = 1;
62
63 /** TODO */
64 private ConsumerManager manager = null;
65 /** TODO */
66 private int port = -1;
67 /** TODO */
68 private ServerSocket serverSocket = null;
69 /** TODO */
70 private ExecutorService executor = null;
71 /** TODO */
72 private int state = STATE_STOPED;
73 /** TODO */
74 private TuboConsumerLoaderException lastException = null;
75
76
77 /**
78 * TODO
79 * @param manager
80 * @param port
81 * @throws TuboConsumerLoaderException
82 */
83 public ServerSocketWorker(ConsumerManager manager, int port) throws TuboConsumerLoaderException {
84 this.manager = manager;
85 this.port = port;
86
87
88 try {
89 serverSocket = new ServerSocket(this.port);
90 } catch (IOException e) {
91 state = STATE_ERROR;
92 lastException = new TuboConsumerLoaderException("Error creating ServerSocket",e);
93 throw lastException;
94 }
95
96
97 String smaxCapacity = manager.getProperty("maxCapacity");
98 int maxCapacity;
99 if(smaxCapacity!=null)
100 try {
101 maxCapacity = Integer.parseInt(smaxCapacity);
102 } catch (NumberFormatException e) {
103 maxCapacity = DEFAULT_MAX_CAPACITY;
104 log.error("Malformed maxCapacity property in consumer id="+manager.getId()+". Using default("+DEFAULT_MAX_CAPACITY+")");
105 }
106 else {
107 maxCapacity = DEFAULT_MAX_CAPACITY;
108 log.info("ServerSocketConsumer id="+manager.getId()+" hasn't defined maxCapacity property. Using default("+DEFAULT_MAX_CAPACITY+")");
109 }
110
111
112 executor = Executors.newFixedThreadPool(maxCapacity);
113 }
114
115 /**
116 * TODO
117 */
118 public void run() {
119
120
121 state = STATE_RUNNING;
122
123
124 while (state<=STATE_RUNNING) {
125
126
127 Socket socket = null;
128 try {
129 socket = serverSocket.accept();
130 } catch (IOException e) {
131 state = STATE_ERROR;
132 lastException = new TuboConsumerLoaderException("Error accepting socket",e);
133 break;
134 }
135
136
137 ServerSocketConsumer consumer = null;
138 try {
139 consumer = (ServerSocketConsumer)manager.getConsumer();
140 } catch (TuboResourceException e) {
141 state = STATE_ERROR;
142 lastException = new TuboConsumerLoaderException("Error getting consumer",e);
143 break;
144 }
145
146
147 ServerSocketConsumerWorker consumerWorker = new ServerSocketConsumerWorker(socket,consumer,manager);
148
149
150 try {
151 executor.execute(consumerWorker);
152 } catch (Exception e) {
153
154 }
155 }
156 }
157
158
159 }