Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BACKLOG-37664 POC to make pentaho server work with vfs metastore #8894

Open
wants to merge 4 commits into
base: metastore-vfsplugin
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
import org.pentaho.di.core.exception.IdNotFoundException;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleFileException;
import org.pentaho.di.core.exception.KettlePluginException;
import org.pentaho.di.core.exception.KettleSecurityException;
import org.pentaho.di.core.extension.ExtensionPointHandler;
import org.pentaho.di.core.extension.KettleExtensionPoint;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.osgi.api.MetastoreLocatorOsgi;
import org.pentaho.di.core.service.PluginServiceLoader;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.imp.Import;
Expand Down Expand Up @@ -113,6 +116,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumMap;
Expand Down Expand Up @@ -146,6 +150,7 @@ public class PurRepository extends AbstractRepository implements Repository, Rec

// Kettle property that when set to false disabled the lazy repository access
public static final String LAZY_REPOSITORY = "KETTLE_LAZY_REPOSITORY";
public static final String VFS_METASTORE_PROVIDER = "VfsMetastoreProvider";

private static Class<?> PKG = PurRepository.class;

Expand Down Expand Up @@ -221,7 +226,7 @@ public class PurRepository extends AbstractRepository implements Repository, Rec

private String connectMessage = null;

protected PurRepositoryMetaStore metaStore;
protected IMetaStore metaStore;

private ConnectionManager connectionManager = ConnectionManager.getInstance();

Expand All @@ -237,12 +242,16 @@ protected enum RepositoryServers {
private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static final ReadWriteLock sharedObjectsLock = new ReentrantReadWriteLock();

private final MetastoreLocatorOsgi metastoreLocator;


// ~ Constructors ====================================================================================================

public PurRepository() {
public PurRepository() throws KettlePluginException {
super();
initSharedObjectAssemblerMap();
Collection<MetastoreLocatorOsgi> metastoreLocators = PluginServiceLoader.loadServices( MetastoreLocatorOsgi.class );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perfectly fine for POC, but did you ran into some problem looking for MetastoreLocator instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes using metastore locator instead of the osgi one required a new dependency that caused problems

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, I remember now. I was able to look up MetastoreLocator and added dependency for pentaho-metastore-locator-api in pom.xml and it worked fine

metastoreLocator = metastoreLocators.stream().findFirst().get();
}

// ~ Methods =========================================================================================================
Expand Down Expand Up @@ -290,7 +299,10 @@ public RootRef getRootRef() {
purRepositoryServiceRegistry.registerService( ILockService.class, new UnifiedRepositoryLockService( pur ) );
purRepositoryServiceRegistry
.registerService( IAclService.class, new UnifiedRepositoryConnectionAclService( pur ) );
metaStore = new PurRepositoryMetaStore( this );
metaStore = metastoreLocator.getExplicitMetastore( VFS_METASTORE_PROVIDER );
if ( metaStore == null ) {
metaStore = new PurRepositoryMetaStore( this );
}
try {
metaStore.createNamespace( PentahoDefaults.NAMESPACE );
} catch ( MetaStoreException e ) {
Expand Down Expand Up @@ -342,12 +354,12 @@ public RootRef getRootRef() {
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "PurRepositoryMetastore.Create.Message" ) );
}
metaStore = new PurRepositoryMetaStore( this );
IMetaStore activeMetaStore = metaStore;
if ( activeMetaStore != null ) {
final IMetaStore connectedMetaStore = activeMetaStore;
connectionManager.setMetastoreSupplier( () -> connectedMetaStore );
metaStore = metastoreLocator.getExplicitMetastore( VFS_METASTORE_PROVIDER );
if ( metaStore == null ) {
metaStore = new PurRepositoryMetaStore( this );
}
final IMetaStore connectedMetaStore = metaStore;
connectionManager.setMetastoreSupplier( () -> connectedMetaStore );

// Create the default Pentaho namespace if it does not exist
try {
Expand Down Expand Up @@ -379,16 +391,17 @@ public RootRef getRootRef() {

@Override public void disconnect() {
connected = false;
metaStore = null;
IMetaStore activeMetaStore = null;
try {
activeMetaStore = MetaStoreConst.openLocalPentahoMetaStore();
} catch ( MetaStoreException e ) {
activeMetaStore = null;
}
if ( activeMetaStore != null ) {
final IMetaStore connectedMetaStore = activeMetaStore;
connectionManager.setMetastoreSupplier( () -> connectedMetaStore );
if ( metaStore instanceof PurRepositoryMetaStore ) {
metaStore = null;
try {
metaStore = MetaStoreConst.openLocalPentahoMetaStore();
} catch ( MetaStoreException e ) {
metaStore = null;
}
if ( metaStore != null ) {
final IMetaStore connectedMetaStore = metaStore;
connectionManager.setMetastoreSupplier( () -> connectedMetaStore );
}
}

purRepositoryConnector.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.Before;
import org.junit.Test;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettlePluginException;
import org.pentaho.di.repository.RepositoryOperation;
import org.pentaho.di.repository.UserInfo;
import org.pentaho.di.ui.repository.pur.services.IAbsSecurityProvider;
Expand All @@ -32,7 +33,7 @@ public class AbsSecurityProviderTest {
private AbsSecurityProvider provider;

@Before
public void setUp() {
public void setUp() throws KettlePluginException {
provider = new AbsSecurityProvider( new PurRepository(), new PurRepositoryMeta(), new UserInfo(),
mock( ServiceManager.class ) );
provider = spy( provider );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.mockito.Mockito;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettlePluginException;
import org.pentaho.di.core.extension.ExtensionPointInterface;
import org.pentaho.di.core.extension.ExtensionPointPluginType;
import org.pentaho.di.core.extension.KettleExtensionPoint;
Expand Down Expand Up @@ -502,7 +503,7 @@ public void testSaveTransOrJob() throws KettleException {
}

@Test
public void testGetJobPathWithoutExtension() {
public void testGetJobPathWithoutExtension() throws KettlePluginException {
PurRepository pur = new PurRepository();
RepositoryDirectoryInterface rdi = mock( RepositoryDirectoryInterface.class );
doReturn( mock( ObjectId.class ) ).when( rdi ).getObjectId();
Expand All @@ -512,7 +513,7 @@ public void testGetJobPathWithoutExtension() {
}

@Test
public void testGetJobPathWithExtension() {
public void testGetJobPathWithExtension() throws KettlePluginException {
PurRepository pur = new PurRepository();
RepositoryDirectoryInterface rdi = mock( RepositoryDirectoryInterface.class );
doReturn( mock( ObjectId.class ) ).when( rdi ).getObjectId();
Expand All @@ -522,7 +523,7 @@ public void testGetJobPathWithExtension() {
}

@Test
public void testGetTransPathWithoutExtension() {
public void testGetTransPathWithoutExtension() throws KettlePluginException {
PurRepository pur = new PurRepository();
RepositoryDirectoryInterface rdi = mock( RepositoryDirectoryInterface.class );
doReturn( mock( ObjectId.class ) ).when( rdi ).getObjectId();
Expand All @@ -532,7 +533,7 @@ public void testGetTransPathWithoutExtension() {
}

@Test
public void testGetTransPathWithExtension() {
public void testGetTransPathWithExtension() throws KettlePluginException {
PurRepository pur = new PurRepository();
RepositoryDirectoryInterface rdi = mock( RepositoryDirectoryInterface.class );
doReturn( mock( ObjectId.class ) ).when( rdi ).getObjectId();
Expand Down Expand Up @@ -618,7 +619,7 @@ public void testCreateValidRepositoryDirectoryForNotRoot() throws KettleExceptio

@Test
@SuppressWarnings( {"squid:S1075", "squid:S3655"} )
public void testGetURI() throws URISyntaxException {
public void testGetURI() throws URISyntaxException, KettlePluginException {
PurRepository purRepo = new PurRepository();
PurRepositoryMeta repoMeta = new PurRepositoryMeta();
repoMeta.setRepositoryLocation( new PurRepositoryLocation( "http://localhost:8080/pentaho" ) );
Expand Down