Skip to content

Commit

Permalink
use direct ByteBuffer to store audio files
Browse files Browse the repository at this point in the history
  • Loading branch information
Gennadiy Dubina committed Mar 14, 2017
1 parent 23dba68 commit 14407dd
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 100 deletions.
2 changes: 1 addition & 1 deletion io/network/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.34.Final</version>
<version>4.1.9.Final</version>
</dependency>
</dependencies>

Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<module>core</module>
<module>client</module>
<module>controls</module>
<module>docs</module>
<!--<module>docs</module>-->
<module>bootstrap</module>
</modules>

Expand Down Expand Up @@ -135,7 +135,7 @@
<module>core</module>
<module>controls</module>
<module>client</module>
<module>docs</module>
<!--<module>docs</module>-->
<module>bootstrap</module>
</modules>
<build>
Expand Down Expand Up @@ -165,7 +165,7 @@
</execution>
</executions>
</plugin>
<plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.8.1</version>
Expand All @@ -178,7 +178,7 @@
</goals>
</execution>
</executions>
</plugin>
</plugin>-->
</plugins>
</build>
</profile>
Expand Down
12 changes: 7 additions & 5 deletions resources/mediaplayer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@
<artifactId>mbrola</artifactId>
<version>${version.freetts}</version>
</dependency>
<dependency>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-buffer -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.9.Final</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package org.mobicents.media.server.impl.resource.mediaplayer.audio;

import com.google.common.cache.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.ehcache.Cache;
import org.ehcache.CacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* Created by achikin on 5/9/16.
Expand All @@ -24,90 +20,46 @@ public class CachedRemoteStreamProvider implements RemoteStreamProvider {

private final static Logger log = Logger.getLogger(CachedRemoteStreamProvider.class);

private CacheManager cacheManager;

private ConcurrentHashMap<String, ByteStreamDownloader> inProgress = new ConcurrentHashMap<>();
private Cache<String, ByteBuf> cache;

public CachedRemoteStreamProvider(int size) {
log.info("Create AudioCache with size: " + size + "Mb");
cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
.withCache("preConfigured",
CacheConfigurationBuilder.newCacheConfigurationBuilder(String.class, byte[].class,
ResourcePoolsBuilder.newResourcePoolsBuilder().offheap(size, MemoryUnit.MB))
.build())
.build(true);
}

private Cache<String, byte[]> getCache() {
return cacheManager.getCache("preConfigured", String.class, byte[].class);
}

public InputStream getStream(URL uri) throws IOException {
String key = uri.toString();
Cache<String, byte[]> cache = getCache();

byte[] stream = cache.get(key);
if (stream == null) {
stream = download(cache, uri);
}

return new ByteArrayInputStream(stream);
}

private byte[] download(Cache<String, byte[]> cache, final URL uri) throws IOException {
String key = uri.toString();
ByteStreamDownloader stream = inProgress.get(key);
if (stream == null) {
stream = new ByteStreamDownloader();
ByteStreamDownloader prev = inProgress.putIfAbsent(key, stream);
if (prev == null) {
//check bytes in cache again too, maybe it's already added
byte[] bytes = cache.get(key);
if (bytes != null) {
return bytes;
cache = CacheBuilder.newBuilder().maximumWeight(size * 1024L * 1024L).weigher(new Weigher<String, ByteBuf>() {
@Override
public int weigh(String s, ByteBuf byteBuf) {
return byteBuf.capacity();
}
}).removalListener(new RemovalListener<String, ByteBuf>() {
@Override
public void onRemoval(RemovalNotification<String, ByteBuf> removalNotification) {
ByteBuf buf = removalNotification.getValue();
if (buf != null) {
buf.release();
}
} else {
stream = prev;
}
}
}).build();
}

public InputStream getStream(final URL uri) throws IOException {
final String key = uri.toString();
try {
byte[] bytes = stream.download(uri);
if (bytes != null) {
cache.putIfAbsent(key, bytes);
} else {
bytes = cache.get(key);
}
if (bytes == null) {
throw new IOException("No data for " + uri);
}
return bytes;
} finally {
inProgress.remove(key);
ByteBuf buf = cache.get(key, new Callable<ByteBuf>() {
@Override
public ByteBuf call() throws Exception {
byte[] bytes = IOUtils.toByteArray(uri.openStream());
return Unpooled.directBuffer(bytes.length).writeBytes(bytes);
}
});
return new ByteBufInputStream(buf.retainedDuplicate(), true);
} catch (Throwable e) {
throw new IOException(e);
}
}

private static class ByteStreamDownloader {

private Lock lock = new ReentrantLock();

volatile boolean downloaded;

public byte[] download(final URL uri) throws IOException {
if (downloaded) {
return null;
}
lock.lock();
try {
//need to check twice
if (downloaded) {
return null;
}
byte[] bytes = IOUtils.toByteArray(uri.openStream());
downloaded = bytes != null;
return bytes;
} finally {
lock.unlock();
}
public void dump() {
log.info("--- Cache dump ---");
for (Map.Entry<String, ByteBuf> e : cache.asMap().entrySet()) {
log.info(e.getKey() + "; " + e.getValue().refCnt());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.concurrent.ExecutionException;

/**
* Created by achikin on 5/9/16.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@
import org.mobicents.media.server.impl.resource.mediaplayer.audio.DirectRemoteStreamProvider;
import org.mobicents.media.server.spi.format.EncodingName;
import org.mobicents.media.server.spi.format.Format;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

import javax.sound.sampled.UnsupportedAudioFileException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
Expand Down Expand Up @@ -66,15 +63,19 @@ public void testCache() throws IOException, UnsupportedAudioFileException {
WavTrackImpl track1 = new WavTrackImpl(url1, cache);
assertEquals(expectedFormat.getName(), track1.getFormat().getName());
assertEquals(expectedDuration, track1.getDuration());
track1.close();

WavTrackImpl track2 = new WavTrackImpl(url2, cache);
assertEquals(expectedFormat.getName(), track2.getFormat().getName());
assertEquals(expectedDuration, track2.getDuration());
track2.close();

WavTrackImpl track3 = new WavTrackImpl(url2, cache);
assertEquals(expectedFormat.getName(), track3.getFormat().getName());
assertEquals(expectedDuration, track3.getDuration());
track3.close();

cache.dump();
verify(mockConnection).getInputStream();
}

Expand All @@ -84,24 +85,28 @@ public void testCacheOverflow() throws IOException, UnsupportedAudioFileExceptio
//1Mb cache contains have 15 full files
int cacheSize = 1;
double fileSize = 61712d;
int iteration = (int) Math.floor(cacheSize * 1024d * 1024d / fileSize) - 1;

//we have 4 segments in guava cache
int iteration = 8;//(int) Math.floor(cacheSize * 1024d * 1024d / fileSize) - 1;
CachedRemoteStreamProvider cache = new CachedRemoteStreamProvider(1);

for (int j = 0; j < 10; j++) {
System.out.println("--- Iteration #: " + (j + 1));
for (int i = 0; i < iteration; i++) {
URL url = new URL(null, "http://test" + i + ".wav", handler);
WavTrackImpl track = new WavTrackImpl(url, cache);
assertEquals(expectedFormat.getName(), track.getFormat().getName());
assertEquals(expectedDuration, track.getDuration());
track.close();
}
cache.dump();
}
verify(mockConnection, Mockito.times(iteration)).getInputStream();
for (int i = iteration; i < 2 * iteration; i++) {
URL url = new URL(null, "http://test" + i + ".wav", handler);
WavTrackImpl track = new WavTrackImpl(url, cache);
assertEquals(expectedFormat.getName(), track.getFormat().getName());
assertEquals(expectedDuration, track.getDuration());
track.close();
}
verify(mockConnection, Mockito.times(2 * iteration)).getInputStream();
}
Expand All @@ -116,14 +121,17 @@ public void testNoCache() throws IOException, UnsupportedAudioFileException {
WavTrackImpl track1 = new WavTrackImpl(url1, noCache);
assertEquals(expectedFormat.getName(), track1.getFormat().getName());
assertEquals(expectedDuration, track1.getDuration());
track1.close();

WavTrackImpl track2 = new WavTrackImpl(url2, noCache);
assertEquals(expectedFormat.getName(), track2.getFormat().getName());
assertEquals(expectedDuration, track2.getDuration());
track2.close();

WavTrackImpl track3 = new WavTrackImpl(url2, noCache);
assertEquals(expectedFormat.getName(), track3.getFormat().getName());
assertEquals(expectedDuration, track3.getDuration());
track3.close();

verify(mockConnection, times(3)).getInputStream();
}
Expand Down

0 comments on commit 14407dd

Please sign in to comment.