Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Commit

Permalink
release 0.9.5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyan.feng committed Sep 11, 2014
1 parent 6b5c33c commit 4df1462
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class TradeCustomer implements Serializable {
protected final long timestamp = System.currentTimeMillis();
protected Pair trade;
protected Pair customer;
protected String buffer;

public Pair getTrade() {
return trade;
Expand All @@ -34,7 +35,15 @@ public long getTimestamp() {
return timestamp;
}

@Override
public String getBuffer() {
return buffer;
}

public void setBuffer(String buffer) {
this.buffer = buffer;
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this,
ToStringStyle.SHORT_PREFIX_STYLE);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.alipay.dw.jstorm.example.sequence.spout;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
Expand Down Expand Up @@ -54,6 +55,9 @@ public class SequenceSpout implements IRichSpout {
private boolean isLimited = false;

private long SPOUT_MAX_SEND_NUM;

private int bufferLen = 0;
private Random random;

public boolean isDistributed() {
return true;
Expand Down Expand Up @@ -100,22 +104,40 @@ public void open(Map conf, TopologyContext context,
+ context.getThisTaskId());

MAX_PENDING_COUNTER = getMaxPending(conf);

LOG.info("Finish open");

bufferLen = JStormUtils.parseInt(conf.get("byte.buffer.len"), 0);

random = new Random();
random.setSeed(System.currentTimeMillis());

LOG.info("Finish open, buffer Len:" + bufferLen);

}

private AtomicLong tradeSum = new AtomicLong(0);
private AtomicLong customerSum = new AtomicLong(0);

public void emit() {

String buffer = null;
if (bufferLen > 0) {
byte[] byteBuffer = new byte[bufferLen];

for (int i = 0; i < bufferLen; i++) {
byteBuffer[i] = (byte)random.nextInt(200);
}

buffer = new String(byteBuffer);
}


Pair trade = PairMaker.makeTradeInstance();
Pair customer = PairMaker.makeCustomerInstance();

TradeCustomer tradeCustomer = new TradeCustomer();
tradeCustomer.setTrade(trade);
tradeCustomer.setCustomer(customer);
tradeCustomer.setBuffer(buffer);

tradeSum.addAndGet(trade.getValue());
customerSum.addAndGet(customer.getValue());
Expand Down
8 changes: 8 additions & 0 deletions history.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
[JStorm 0.9.0 ½éÉÜ](http://wenku.baidu.com/view/59e81017dd36a32d7375818b.html)

#Release 0.9.5.1
1. Add netty sync mode
2. Add block operation in netty async mode
3. Replace exception with Throwable in executor layer
4. Upgrade curator-framework version from 1.15 to 1.3.2
5. Add more netty junit test
6. Add log when queue is full

#Release 0.9.5
##Big feature:
1. Redesign scheduler arithmetic, basing worker not task .
Expand Down
8 changes: 4 additions & 4 deletions jstorm-client-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
<parent>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-all</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<relativePath>..</relativePath>
</parent>
</parent>
<!--<parent>
<groupId>com.taobao</groupId>
<artifactId>parent</artifactId>
<version>1.0.2</version>
</parent> -->
</parent>-->
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<packaging>jar</packaging>
<name>${project.artifactId}-${project.version}</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,13 @@ public static boolean isNettySyncMode(Map conf) {
public static void setNettySyncMode(Map conf, boolean sync) {
conf.put(NETTY_SYNC_MODE, sync);
}

protected static String NETTY_ASYNC_BLOCK = "storm.messaging.netty.async.block";
public static boolean isNettyASyncBlock(Map conf) {
return JStormUtils.parseBoolean(conf.get(NETTY_ASYNC_BLOCK), true);
}

public static void setNettyASyncBlock(Map conf, boolean block) {
conf.put(NETTY_ASYNC_BLOCK, block);
}
}
6 changes: 3 additions & 3 deletions jstorm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
<parent>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-all</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<relativePath>..</relativePath>
</parent>
<!--<parent>
<!-- <parent>
<groupId>com.taobao</groupId>
<artifactId>parent</artifactId>
<version>1.0.2</version>
</parent>-->
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<packaging>jar</packaging>
<name>${project.artifactId}-${project.version}</name>

Expand Down
8 changes: 4 additions & 4 deletions jstorm-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
<parent>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-all</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<relativePath>..</relativePath>
</parent>
</parent>
<!-- <parent>
<groupId>com.taobao</groupId>
<artifactId>parent</artifactId>
<version>1.0.2</version>
</parent> -->
</parent>-->
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-server</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<packaging>jar</packaging>
<name>${project.artifactId}-${project.version}</name>
<description>jstorm server modules</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class NettyClientAsync extends NettyClient {

protected AtomicBoolean flush_later;
protected int flushCheckInterval;
protected final boolean isWithAcker;
protected final boolean blockSend;

boolean isDirectSend(Map conf) {

Expand All @@ -44,6 +44,14 @@ boolean isDirectSend(Map conf) {

return !ConfigExtension.isNettyTransferAsyncBatch(conf);
}

boolean isBlockSend(Map storm_conf) {
if (ConfigExtension.isTopologyContainAcker(storm_conf) == false) {
return false;
}

return ConfigExtension.isNettyASyncBlock(storm_conf);
}

@SuppressWarnings("rawtypes")
NettyClientAsync(Map storm_conf, ChannelFactory factory,
Expand All @@ -54,7 +62,7 @@ boolean isDirectSend(Map conf) {
BATCH_THREASHOLD_WARN = ConfigExtension
.getNettyBufferThresholdSize(storm_conf);

isWithAcker = ConfigExtension.isTopologyContainAcker(storm_conf);
blockSend = isBlockSend(storm_conf);

directlySend = isDirectSend(storm_conf);

Expand Down Expand Up @@ -174,7 +182,7 @@ void handleFailedChannel(MessageBatch messageBatch) {
/ BATCH_THREASHOLD_WARN;
long sleepMs = count * 10;

if (isWithAcker == false) {
if (blockSend == false) {
LOG.warn(
"Target server {} is unavailable, pending {}, bufferSize {}, block sending {}ms",
name, pendings.get(), cachedSize, sleepMs);
Expand Down Expand Up @@ -294,7 +302,7 @@ Channel isChannelReady() {
return null;
}

if (isWithAcker == true && pendings.get() >= MAX_SEND_PENDING) {
if (blockSend == true && pendings.get() >= MAX_SEND_PENDING) {
return null;
}
return channel;
Expand Down
4 changes: 4 additions & 0 deletions jstorm-server/src/main/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ storm.messaging.netty.flush.check.interval.ms: 10
# it will slow down the netty sending speed
storm.messaging.netty.buffer.threshold: 8388608
storm.messaging.netty.max.pending: 16
## send message with sync or async mode
storm.messaging.netty.sync.mode: false
## when netty is in sync mode and client channel is unavailable,
## it will block sending until channel is ready
storm.messaging.netty.async.block: true

### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class NettyUnitTest {
ConfigExtension.setNettySyncMode(storm_conf, true);
} else {
ConfigExtension.setNettySyncMode(storm_conf, false);
ConfigExtension.setNettyASyncBlock(storm_conf, false);
}

}
Expand Down
2 changes: 1 addition & 1 deletion jstorm-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-all</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<relativePath>..</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion jstorm-ui/src/main/webapp/cluster.xhtml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<f:facet name="header">
<h:outputText value="version" />
</f:facet>
<h:outputText value="0.9.5" />
<h:outputText value="0.9.5.1" />
</p:column>

<p:column>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-all</artifactId>
<version>0.9.5</version>
<version>0.9.5.1</version>
<packaging>pom</packaging>
<name>java storm</name>
<description>java storm</description>
Expand Down

0 comments on commit 4df1462

Please sign in to comment.