diff --git a/src/main/resources/edda.properties b/src/main/resources/edda.properties index bae0a946..b8b703a7 100644 --- a/src/main/resources/edda.properties +++ b/src/main/resources/edda.properties @@ -50,7 +50,12 @@ # Set the region for the AWS endpoints # # -edda.region=us-west-1 +edda.region=us-east-1 +edda.aws.assumeRoleArn= +edda.aws.assumerole.enabled=true +#edda.aws.accessKey= +#edda.aws.secretKey= + # # Set a region for a specific account diff --git a/src/main/scala/com/netflix/edda/Crawler.scala b/src/main/scala/com/netflix/edda/Crawler.scala index 48c45ac3..e9af25e0 100644 --- a/src/main/scala/com/netflix/edda/Crawler.scala +++ b/src/main/scala/com/netflix/edda/Crawler.scala @@ -64,6 +64,8 @@ abstract class Crawler extends Observable { lazy val throttle_delay = Utils.getProperty("edda.crawler", "throttle.delay", name, "200") lazy val retry_max = Utils.getProperty("edda.crawler", "throttle.maxDelayMultiplier", name, "225") lazy val request_delay = Utils.getProperty("edda.crawler", "requestDelay", name, "0") + lazy val assumeRoleEnabled = Utils.getProperty("edda.aws", "assumerole.enabled", name, "false").get.toBoolean + /* number of retries attempted */ var retry_count = 0 diff --git a/src/main/scala/com/netflix/edda/aws/AwsClient.scala b/src/main/scala/com/netflix/edda/aws/AwsClient.scala index 8a3be9b6..79b1c51d 100644 --- a/src/main/scala/com/netflix/edda/aws/AwsClient.scala +++ b/src/main/scala/com/netflix/edda/aws/AwsClient.scala @@ -15,30 +15,23 @@ */ package com.netflix.edda.aws -import com.netflix.edda.Utils - -import com.amazonaws.auth.AWSCredentials -import com.amazonaws.auth.BasicAWSCredentials -import com.amazonaws.auth.AWSCredentialsProvider -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider -import com.amazonaws.auth.profile.ProfileCredentialsProvider - -import com.amazonaws.services.ec2.AmazonEC2Client +import com.amazonaws.auth._ import com.amazonaws.services.autoscaling.AmazonAutoScalingClient +import com.amazonaws.services.cloudformation.AmazonCloudFormationClient +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient +import com.amazonaws.services.ec2.AmazonEC2Client +import com.amazonaws.services.elasticache.AmazonElastiCacheClient import com.amazonaws.services.elasticloadbalancing.AmazonElasticLoadBalancingClient import com.amazonaws.services.elasticloadbalancingv2.{AmazonElasticLoadBalancingClient => AmazonElasticLoadBalancingV2Client} import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient -import com.amazonaws.services.s3.AmazonS3Client -import com.amazonaws.services.sqs.AmazonSQSClient -import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient -import com.amazonaws.services.route53.AmazonRoute53Client import com.amazonaws.services.rds.AmazonRDSClient -import com.amazonaws.services.elasticache.AmazonElastiCacheClient -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient -import com.amazonaws.services.cloudformation.AmazonCloudFormationClient +import com.amazonaws.services.route53.AmazonRoute53Client +import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest +import com.amazonaws.services.sqs.AmazonSQSClient +import com.netflix.edda.Utils object AwsClient { def mkCredentialProvider(accessKey: String, secretKey: String, arn: String): AWSCredentialsProvider = { @@ -51,11 +44,12 @@ object AwsClient { } } if (arn.isEmpty) { - provider + provider } else { - new STSAssumeRoleSessionCredentialsProvider(provider, arn, "edda") + new STSAssumeRoleSessionCredentialsProvider(provider, arn, "edda") } } + } @@ -99,6 +93,11 @@ class AwsClient(val provider: AWSCredentialsProvider, val region: String) { this(AwsClient.mkCredentialProvider(accessKey,secretKey, ""), region) + /* Basic Credintial Provider */ + def getBasicCredsProvider = { + InstanceProfileCredentialsProvider.getInstance() + } + /** generate a resource arn */ def arn(resourceAPI: String, resourceType: String, resourceName: String): String = { "arn:aws:" + resourceAPI + ":" + region + ":" + account + ":" + resourceType + arnSeperator(resourceType) + resourceName @@ -120,36 +119,41 @@ class AwsClient(val provider: AWSCredentialsProvider, val region: String) { } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/ec2/AmazonEC2Client.html com.amazonaws.services.ec2.AmazonEC2Client]] object */ - def ec2 = { - val client = new AmazonEC2Client(provider) + def ec2(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonEC2Client(credsProvider) client.setEndpoint("ec2." + region + ".amazonaws.com") client } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/autoscaling/AmazonAutoScalingClient.html com.amazonaws.services.autoscaling.AmazonAutoScalingClient]] object */ - def asg = { - val client = new AmazonAutoScalingClient(provider) + def asg(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonAutoScalingClient(credsProvider) client.setEndpoint("autoscaling." + region + ".amazonaws.com") client } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticloadbalancing/AmazonElasticLoadBalancingClient.html com.amazonaws.services.elasticloadbalancing.AmazonElasticLoadBalancingClient]] object */ - def elb = { - val client = new AmazonElasticLoadBalancingClient(provider) + def elb(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonElasticLoadBalancingClient(credsProvider) client.setEndpoint("elasticloadbalancing." + region + ".amazonaws.com") client } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/elasticloadbalancingv2/AmazonElasticLoadBalancingClient.html com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient]] object */ - def elbv2 = { - val client = new AmazonElasticLoadBalancingV2Client(provider) + def elbv2(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonElasticLoadBalancingV2Client(credsProvider) client.setEndpoint("elasticloadbalancing." + region + ".amazonaws.com") client } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Client.html com.amazonaws.services.s3.AmazonS3Client]] object */ - def s3 = { - val client = new AmazonS3Client(provider) + def s3(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonS3Client(credsProvider) if (region == "us-east-1") client.setEndpoint("s3.amazonaws.com") else @@ -158,8 +162,9 @@ class AwsClient(val provider: AWSCredentialsProvider, val region: String) { } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/identitymanagement/AmazonIdentityManagementClient.html com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient]] object */ - def identitymanagement = { - val client = new AmazonIdentityManagementClient(provider) + def identitymanagement(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonIdentityManagementClient(credsProvider) if (region == "us-gov") client.setEndpoint("iam.us-gov.amazonaws.com") else @@ -168,47 +173,54 @@ class AwsClient(val provider: AWSCredentialsProvider, val region: String) { } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html com.amazonaws.services.sqs.AmazonSQSClient]] object */ - def sqs = { - val client = new AmazonSQSClient(provider) + def sqs(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonSQSClient(credsProvider) client.setEndpoint("sqs." + region + ".amazonaws.com") client } /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatchClient.html com.amazonaws.services.cloudwatch.AmazonCloudWatchClient]] object */ - def cw = { - val client = new AmazonCloudWatchClient(provider) + def cw(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonCloudWatchClient(credsProvider) client.setEndpoint("monitoring." + region + ".amazonaws.com") client } - /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/route53/AmazonRoute53Client.html com.amazonaws.services.route53.AmazonRoute53Client]] object */ - def route53 = { - val client = new AmazonRoute53Client(provider) - client.setEndpoint("route53.amazonaws.com") - client - } + /** get [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/route53/AmazonRoute53Client.html com.amazonaws.services.route53.AmazonRoute53Client]] object */ + def route53(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonRoute53Client(credsProvider) + client.setEndpoint("route53.amazonaws.com") + client + } - def rds = { - val client = new AmazonRDSClient(provider) - client.setEndpoint("rds." + region + ".amazonaws.com") - client - } + def rds(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonRDSClient(credsProvider) + client.setEndpoint("rds." + region + ".amazonaws.com") + client + } - def elasticache = { - val client = new AmazonElastiCacheClient(provider) + def elasticache(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonElastiCacheClient(credsProvider) client.setEndpoint("elasticache." + region + ".amazonaws.com") client - } + } - def dynamo = { - val client = new AmazonDynamoDBClient(provider) - client.setEndpoint("dynamodb." + region + ".amazonaws.com") - client - } + def dynamo(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonDynamoDBClient(credsProvider) + client.setEndpoint("dynamodb." + region + ".amazonaws.com") + client + } - def cloudformation = { - val client = new AmazonCloudFormationClient(provider) + def cloudformation(needAssumeRoleProvider : Boolean = false) = { + val credsProvider = if(needAssumeRoleProvider) provider else getBasicCredsProvider + val client = new AmazonCloudFormationClient(credsProvider) client.setEndpoint("cloudformation." + region + ".amazonaws.com") client - } -} + } +} \ No newline at end of file diff --git a/src/main/scala/com/netflix/edda/aws/AwsCrawlers.scala b/src/main/scala/com/netflix/edda/aws/AwsCrawlers.scala index ecda7ae4..dcec36e3 100644 --- a/src/main/scala/com/netflix/edda/aws/AwsCrawlers.scala +++ b/src/main/scala/com/netflix/edda/aws/AwsCrawlers.scala @@ -46,10 +46,8 @@ import com.amazonaws.services.identitymanagement.model._ import com.amazonaws.services.s3.model.ListBucketsRequest import com.amazonaws.services.sqs.model.ListQueuesRequest import com.amazonaws.services.sqs.model.GetQueueAttributesRequest - import com.amazonaws.services.cloudformation.model.DescribeStacksRequest import com.amazonaws.services.cloudformation.model.ListStackResourcesRequest - import com.amazonaws.services.cloudwatch.model.DescribeAlarmsRequest import com.amazonaws.services.autoscaling.model.DescribeAutoScalingGroupsRequest import com.amazonaws.services.autoscaling.model.DescribeLaunchConfigurationsRequest @@ -63,9 +61,7 @@ import com.amazonaws.services.elasticloadbalancingv2.model.{DescribeLoadBalancer import com.amazonaws.services.elasticloadbalancingv2.model.DescribeTargetGroupsRequest import com.amazonaws.services.route53.model.ListHostedZonesRequest import com.amazonaws.services.route53.model.ListResourceRecordSetsRequest - import com.amazonaws.services.elasticache.model.DescribeCacheClustersRequest - import com.amazonaws.services.rds.model.DescribeDBInstancesRequest import com.amazonaws.services.rds.model.DescribeDBSubnetGroupsRequest import com.amazonaws.services.rds.model.ListTagsForResourceRequest @@ -74,8 +70,8 @@ import collection.JavaConverters._ import java.util.concurrent.Executors import java.util.concurrent.Callable +import com.amazonaws.services.cloudformation.AmazonCloudFormationClient import org.slf4j.LoggerFactory - import com.amazonaws.services.rds.model.DescribeDBInstancesRequest import com.amazonaws.services.rds.model.ListTagsForResourceRequest import com.amazonaws.services.elasticache.model.DescribeCacheClustersRequest @@ -160,9 +156,13 @@ abstract class AwsIterator extends Iterator[Seq[Record]] { class AwsAddressCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new DescribeAddressesRequest - override def doCrawl()(implicit req: RequestId) = - backoffRequest { ctx.awsClient.ec2.describeAddresses(request).getAddresses.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = + backoffRequest { ctx.awsClient.ec2().describeAddresses(request).getAddresses.asScala.map( item => Record(item.getPublicIp, ctx.beanMapper(item))) }.toSeq + + override def doCrawl()(implicit req: RequestId) = + backoffRequest { ctx.awsClient.ec2(true).describeAddresses(request).getAddresses.asScala.map( + item => Record(item.getPublicIp, ctx.beanMapper(item))) }.toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for AutoScalingGroups @@ -176,11 +176,12 @@ class AwsAutoScalingGroupCrawler(val name: String, val ctx: AwsCrawler.Context) request.setMaxRecords(50) lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.asg.describeAutoScalingGroups(request.withNextToken(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.asg().describeAutoScalingGroups(request.withNextToken(this.nextToken.get)) } this.nextToken = Option(response.getNextToken) response.getAutoScalingGroups.asScala.map( item => { @@ -197,6 +198,28 @@ class AwsAutoScalingGroupCrawler(val name: String, val ctx: AwsCrawler.Context) } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.asg(true).describeAutoScalingGroups(request.withNextToken(this.nextToken.get)) } + this.nextToken = Option(response.getNextToken) + response.getAutoScalingGroups.asScala.map( + item => { + tagCount += item.getTags.size + Record(item.getAutoScalingGroupName, new DateTime(item.getCreatedTime), ctx.beanMapper(item)) + }).toList + } + } + val list = it.toList.flatten + if (tagCount == 0) { + if (abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for any record in " + name + ", ignoring crawl results") + } else if (logger.isWarnEnabled) logger.warn(s"$req no tags found for any record in $name. If you expect at least one tag then set: edda.crawler.$name.abortWithoutTags=true") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else List.empty[Record]) + } } /** crawler for ASG Policies @@ -209,10 +232,10 @@ class AwsScalingPolicyCrawler(val name: String, val ctx: AwsCrawler.Context) ext val request = new DescribePoliciesRequest request.setMaxRecords(50) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.asg.describePolicies(request.withNextToken(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.asg().describePolicies(request.withNextToken(this.nextToken.get)) } this.nextToken = Option(response.getNextToken) response.getScalingPolicies.asScala.map( item => { @@ -222,6 +245,20 @@ class AwsScalingPolicyCrawler(val name: String, val ctx: AwsCrawler.Context) ext } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.asg(true).describePolicies(request.withNextToken(this.nextToken.get)) } + this.nextToken = Option(response.getNextToken) + response.getScalingPolicies.asScala.map( + item => { + Record(item.getPolicyName, ctx.beanMapper(item)) + }).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for ASG Activities @@ -234,10 +271,10 @@ class AwsScalingActivitiesCrawler(val name: String, val ctx: AwsCrawler.Context) val request = new DescribeScalingActivitiesRequest request.setMaxRecords(50) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.asg.describeScalingActivities(request.withNextToken(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.asg().describeScalingActivities(request.withNextToken(this.nextToken.get)) } this.nextToken = Option(response.getNextToken) response.getActivities.asScala.map( item => { @@ -247,6 +284,20 @@ class AwsScalingActivitiesCrawler(val name: String, val ctx: AwsCrawler.Context) } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.asg(true).describeScalingActivities(request.withNextToken(this.nextToken.get)) } + this.nextToken = Option(response.getNextToken) + response.getActivities.asScala.map( + item => { + Record(item.getActivityId, ctx.beanMapper(item)) + }).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for ASG Scheduled Actions @@ -259,10 +310,10 @@ class AwsScheduledActionsCrawler(val name: String, val ctx: AwsCrawler.Context) val request = new DescribeScheduledActionsRequest request.setMaxRecords(50) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.asg.describeScheduledActions(request.withNextToken(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.asg().describeScheduledActions(request.withNextToken(this.nextToken.get)) } this.nextToken = Option(response.getNextToken) response.getScheduledUpdateGroupActions.asScala.map( item => { @@ -272,6 +323,20 @@ class AwsScheduledActionsCrawler(val name: String, val ctx: AwsCrawler.Context) } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.asg(true).describeScheduledActions(request.withNextToken(this.nextToken.get)) } + this.nextToken = Option(response.getNextToken) + response.getScheduledUpdateGroupActions.asScala.map( + item => { + Record(item.getScheduledActionARN, ctx.beanMapper(item)) + }).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for ASG VPCs @@ -283,13 +348,21 @@ class AwsVpcCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawl private[this] val logger = LoggerFactory.getLogger(getClass) val request = new DescribeVpcsRequest - override def doCrawl()(implicit req: RequestId) = { - val response = backoffRequest { ctx.awsClient.ec2.describeVpcs() } + private def doNonAssumeCrawl()(implicit req: RequestId) = { + val response = backoffRequest { ctx.awsClient.ec2().describeVpcs() } response.getVpcs.asScala.map( item => { Record(item.getVpcId, ctx.beanMapper(item)) }).toList } + + override def doCrawl()(implicit req: RequestId) = { + val response = backoffRequest { ctx.awsClient.ec2(true).describeVpcs() } + response.getVpcs.asScala.map( + item => { + Record(item.getVpcId, ctx.beanMapper(item)) + }).toList ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } @@ -303,10 +376,10 @@ class AwsAlarmCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cra val request = new DescribeAlarmsRequest request.setMaxRecords(100) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.cw.describeAlarms(request.withNextToken(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.cw().describeAlarms(request.withNextToken(this.nextToken.get)) } this.nextToken = Option(response.getNextToken) response.getMetricAlarms.asScala.map( item => { @@ -316,6 +389,20 @@ class AwsAlarmCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cra } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.cw(true).describeAlarms(request.withNextToken(this.nextToken.get)) } + this.nextToken = Option(response.getNextToken) + response.getMetricAlarms.asScala.map( + item => { + Record(item.getAlarmName, ctx.beanMapper(item)) + }).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for Images @@ -330,9 +417,9 @@ class AwsImageCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cra val request = new DescribeImagesRequest lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 - val list = backoffRequest { ctx.awsClient.ec2.describeImages(request).getImages }.asScala.map( + val list = backoffRequest { ctx.awsClient.ec2().describeImages(request).getImages }.asScala.map( item => { tagCount += item.getTags.size Record(item.getImageId, ctx.beanMapper(item)) @@ -342,6 +429,19 @@ class AwsImageCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cra } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val list = backoffRequest { ctx.awsClient.ec2(true).describeImages(request).getImages }.asScala.map( + item => { + tagCount += item.getTags.size + Record(item.getImageId, ctx.beanMapper(item)) + }).toSeq + if (tagCount == 0 && abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for " + name + ", ignoring crawl results") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for LoadBalancers @@ -354,10 +454,10 @@ class AwsLoadBalancerCrawler(val name: String, val ctx: AwsCrawler.Context) exte val request = new DescribeLoadBalancersRequest request.setPageSize(400) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.elb.describeLoadBalancers(request.withMarker(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.elb().describeLoadBalancers(request.withMarker(this.nextToken.get)) } this.nextToken = Option(response.getNextMarker) response.getLoadBalancerDescriptions.asScala.map( item => { @@ -380,7 +480,7 @@ class AwsLoadBalancerCrawler(val name: String, val ctx: AwsCrawler.Context) exte } try { val request = new com.amazonaws.services.elasticloadbalancing.model.DescribeTagsRequest().withLoadBalancerNames(names.asJava) - val response = backoffRequest { ctx.awsClient.elb.describeTags(request) } + val response = backoffRequest { ctx.awsClient.elb().describeTags(request) } val responseList = backoffRequest { response.getTagDescriptions().asScala.map( item => { ctx.beanMapper(item) @@ -403,6 +503,57 @@ class AwsLoadBalancerCrawler(val name: String, val ctx: AwsCrawler.Context) exte } buffer.toList } + + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.elb(true).describeLoadBalancers(request.withMarker(this.nextToken.get)) } + this.nextToken = Option(response.getNextMarker) + response.getLoadBalancerDescriptions.asScala.map( + item => { + Record(item.getLoadBalancerName, new DateTime(item.getCreatedTime), ctx.beanMapper(item)) + }).toList + } + } + val initial = it.map(_.asInstanceOf[Record]).toSeq.grouped(20).toList + + backoffRequest { ctx.awsClient.loadAccountNum() } + + var buffer = new ListBuffer[Record]() + + for (group <- initial) { + var names = new ListBuffer[String]() + + for (rec <- group) { + val data = rec.toMap("data").asInstanceOf[Map[String,String]] + names += data("loadBalancerName") + } + try { + val request = new com.amazonaws.services.elasticloadbalancing.model.DescribeTagsRequest().withLoadBalancerNames(names.asJava) + val response = backoffRequest { ctx.awsClient.elb(true).describeTags(request) } + val responseList = backoffRequest { response.getTagDescriptions().asScala.map( + item => { + ctx.beanMapper(item) + }).toSeq + } + + for (rec <- group) { + val data = rec.toMap("data").asInstanceOf[Map[String,String]] + for (response <- responseList) { + if (response.asInstanceOf[Map[String,Any]]("loadBalancerName") == data("loadBalancerName")) { + buffer += rec.copy(data = data.asInstanceOf[Map[String,Any]] ++ Map("tags" -> response.asInstanceOf[Map[String,Any]]("tags"))) + } + } + } + } catch { + case e: Exception => { + logger.error("error retrieving tags for an elb", e) + } + } + } + buffer.toList ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for LoadBalancers (version 2) @@ -414,17 +565,17 @@ class AwsLoadBalancerV2Crawler(val name: String, val ctx: AwsCrawler.Context) ex private[this] val logger = LoggerFactory.getLogger(getClass) val request = new DescribeLoadBalancersV2Request - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator { override def next() = { val response = backoffRequest { - ctx.awsClient.elbv2.describeLoadBalancers(request.withMarker(this.nextToken.get)) + ctx.awsClient.elbv2().describeLoadBalancers(request.withMarker(this.nextToken.get)) } this.nextToken = Option(response.getNextMarker) response.getLoadBalancers.asScala.map( item => { val lr = new DescribeListenersRequest().withLoadBalancerArn(item.getLoadBalancerArn) - val listeners = backoffRequest { ctx.awsClient.elbv2.describeListeners(lr) }.getListeners + val listeners = backoffRequest { ctx.awsClient.elbv2().describeListeners(lr) }.getListeners // If there are no listeners AWS returns null instead of an empty list val listenersList = if (listeners == null) Nil else listeners.asScala.map(item => ctx.beanMapper(item)).toList val data = ctx.beanMapper(item).asInstanceOf[Map[String, Any]] ++ Map("listeners" -> listenersList) @@ -436,6 +587,29 @@ class AwsLoadBalancerV2Crawler(val name: String, val ctx: AwsCrawler.Context) ex } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator { + override def next() = { + val response = backoffRequest { + ctx.awsClient.elbv2(true).describeLoadBalancers(request.withMarker(this.nextToken.get)) + } + this.nextToken = Option(response.getNextMarker) + response.getLoadBalancers.asScala.map( + item => { + val lr = new DescribeListenersRequest().withLoadBalancerArn(item.getLoadBalancerArn) + val listeners = backoffRequest { ctx.awsClient.elbv2(true).describeListeners(lr) }.getListeners + // If there are no listeners AWS returns null instead of an empty list + val listenersList = if (listeners == null) Nil else listeners.asScala.map(item => ctx.beanMapper(item)).toList + val data = ctx.beanMapper(item).asInstanceOf[Map[String, Any]] ++ Map("listeners" -> listenersList) + + Record(item.getLoadBalancerName, new DateTime(item.getCreatedTime), data) + } + ).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for TargetGroups @@ -447,11 +621,11 @@ class AwsTargetGroupCrawler(val name: String, val ctx: AwsCrawler.Context) exten private[this] val logger = LoggerFactory.getLogger(getClass) val request = new DescribeTargetGroupsRequest - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator { override def next() = { val response = backoffRequest { - ctx.awsClient.elbv2.describeTargetGroups(request.withMarker(this.nextToken.get)) + ctx.awsClient.elbv2().describeTargetGroups(request.withMarker(this.nextToken.get)) } this.nextToken = Option(response.getNextMarker) response.getTargetGroups.asScala.map( @@ -461,6 +635,21 @@ class AwsTargetGroupCrawler(val name: String, val ctx: AwsCrawler.Context) exten } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator { + override def next() = { + val response = backoffRequest { + ctx.awsClient.elbv2(true).describeTargetGroups(request.withMarker(this.nextToken.get)) + } + this.nextToken = Option(response.getNextMarker) + response.getTargetGroups.asScala.map( + item => Record(item.getTargetGroupName, ctx.beanMapper(item)) + ).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } case class AwsInstanceHealthCrawlerState(elbRecords: Seq[Record] = Seq[Record]()) @@ -500,14 +689,14 @@ class AwsInstanceHealthCrawler(val name: String, val ctx: AwsCrawler.Context, va * @param elbRecords the records to crawl * @return the record set for the instanceHealth */ - def doCrawl(elbRecords: Seq[Record])(implicit req: RequestId): Seq[Record] = { + private def doNonAssumeCrawl(elbRecords: Seq[Record])(implicit req: RequestId): Seq[Record] = { val futures: Seq[java.util.concurrent.Future[Record]] = elbRecords.map( elb => { threadPool.submit( new Callable[Record] { def call() = { try { - val instances = backoffRequest { ctx.awsClient.elb.describeInstanceHealth(new DescribeInstanceHealthRequest(elb.id)).getInstanceStates } + val instances = backoffRequest { ctx.awsClient.elb().describeInstanceHealth(new DescribeInstanceHealthRequest(elb.id)).getInstanceStates } elb.copy(data = Map("name" -> elb.id, "instances" -> instances.asScala.map(ctx.beanMapper(_)))) } catch { case e: Exception => { @@ -541,6 +730,47 @@ class AwsInstanceHealthCrawler(val name: String, val ctx: AwsCrawler.Context, va records } + def doCrawl(elbRecords: Seq[Record])(implicit req: RequestId): Seq[Record] = { + val futures: Seq[java.util.concurrent.Future[Record]] = elbRecords.map( + elb => { + threadPool.submit( + new Callable[Record] { + def call() = { + try { + val instances = backoffRequest { ctx.awsClient.elb(true).describeInstanceHealth(new DescribeInstanceHealthRequest(elb.id)).getInstanceStates } + elb.copy(data = Map("name" -> elb.id, "instances" -> instances.asScala.map(ctx.beanMapper(_)))) + } catch { + case e: Exception => { + throw new java.lang.RuntimeException(this + " describeInstanceHealth failed for ELB " + elb.id, e) + } + } + } + } + ) + } + ) + var failed: Boolean = false + val records = futures.map( + f => { + try Some(f.get) + catch { + case e: Exception => { + failed = true + if (logger.isErrorEnabled) logger.error(s"$req$this exception from describeInstanceHealth", e) + None + } + } + } + ).collect { + case Some(rec) => rec + } + + if (failed) { + throw new java.lang.RuntimeException(s"$this failed to crawl instance health") + } + records ++ (if(assumeRoleEnabled) doNonAssumeCrawl(elbRecords) else Seq.empty[Record]) + } + protected override def initState = addInitialState(super.initState, newLocalState(AwsInstanceHealthCrawlerState())) protected override def init() { @@ -589,10 +819,10 @@ class AwsLaunchConfigurationCrawler(val name: String, val ctx: AwsCrawler.Contex val request = new DescribeLaunchConfigurationsRequest request.setMaxRecords(50) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.asg.describeLaunchConfigurations(request.withNextToken(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.asg().describeLaunchConfigurations(request.withNextToken(this.nextToken.get)) } this.nextToken = Option(response.getNextToken) response.getLaunchConfigurations.asScala.map( item => Record(item.getLaunchConfigurationName, new DateTime(item.getCreatedTime), ctx.beanMapper(item))).toList @@ -600,6 +830,18 @@ class AwsLaunchConfigurationCrawler(val name: String, val ctx: AwsCrawler.Contex } it.toList.flatten } + + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.asg(true).describeLaunchConfigurations(request.withNextToken(this.nextToken.get)) } + this.nextToken = Option(response.getNextToken) + response.getLaunchConfigurations.asScala.map( + item => Record(item.getLaunchConfigurationName, new DateTime(item.getCreatedTime), ctx.beanMapper(item))).toList + } + } + it.toList.flatten ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for Network Interfaces @@ -610,14 +852,23 @@ class AwsLaunchConfigurationCrawler(val name: String, val ctx: AwsCrawler.Contex class AwsNetworkInterfaceCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new DescribeNetworkInterfacesRequest - override def doCrawl()(implicit req: RequestId) = { - val list = backoffRequest { ctx.awsClient.ec2.describeNetworkInterfaces(request).getNetworkInterfaces }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = { + val list = backoffRequest { ctx.awsClient.ec2().describeNetworkInterfaces(request).getNetworkInterfaces }.asScala.map( item => { Record(item.getNetworkInterfaceId, ctx.beanMapper(item)) } ) list } + + override def doCrawl()(implicit req: RequestId) = { + val list = backoffRequest { ctx.awsClient.ec2(true).describeNetworkInterfaces(request).getNetworkInterfaces }.asScala.map( + item => { + Record(item.getNetworkInterfaceId, ctx.beanMapper(item)) + } + ) + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for Reservations (ie group of instances, not pre-paid reserved instances) @@ -629,9 +880,9 @@ class AwsReservationCrawler(val name: String, val ctx: AwsCrawler.Context) exten val request = new DescribeInstancesRequest lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 - val list = backoffRequest { ctx.awsClient.ec2.describeInstances(request).getReservations }.asScala.map( + val list = backoffRequest { ctx.awsClient.ec2().describeInstances(request).getReservations }.asScala.map( item => { tagCount += item.getInstances.asScala.map(_.getTags.size).sum Record(item.getReservationId, ctx.beanMapper(item)) @@ -641,6 +892,19 @@ class AwsReservationCrawler(val name: String, val ctx: AwsCrawler.Context) exten } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val list = backoffRequest { ctx.awsClient.ec2(true).describeInstances(request).getReservations }.asScala.map( + item => { + tagCount += item.getInstances.asScala.map(_.getTags.size).sum + Record(item.getReservationId, ctx.beanMapper(item)) + }).toSeq + if (tagCount == 0 && abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for " + name + ", ignoring crawl results") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } case class AwsInstanceCrawlerState(reservationRecords: Seq[Record] = Seq[Record]()) @@ -721,9 +985,9 @@ class AwsSecurityGroupCrawler(val name: String, val ctx: AwsCrawler.Context) ext val request = new DescribeSecurityGroupsRequest lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 - val list = backoffRequest { ctx.awsClient.ec2.describeSecurityGroups(request).getSecurityGroups }.asScala.map( + val list = backoffRequest { ctx.awsClient.ec2().describeSecurityGroups(request).getSecurityGroups }.asScala.map( item => { tagCount += item.getTags.size Record(item.getGroupId, ctx.beanMapper(item)) @@ -733,6 +997,19 @@ class AwsSecurityGroupCrawler(val name: String, val ctx: AwsCrawler.Context) ext } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val list = backoffRequest { ctx.awsClient.ec2(true).describeSecurityGroups(request).getSecurityGroups }.asScala.map( + item => { + tagCount += item.getTags.size + Record(item.getGroupId, ctx.beanMapper(item)) + }).toSeq + if (tagCount == 0 && abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for " + name + ", ignoring crawl results") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for Snapshots @@ -744,9 +1021,9 @@ class AwsSnapshotCrawler(val name: String, val ctx: AwsCrawler.Context) extends val request = new DescribeSnapshotsRequest lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 - val list = backoffRequest { ctx.awsClient.ec2.describeSnapshots(request).getSnapshots }.asScala.map( + val list = backoffRequest { ctx.awsClient.ec2().describeSnapshots(request).getSnapshots }.asScala.map( item => { tagCount += item.getTags.size Record(item.getSnapshotId, new DateTime(item.getStartTime), ctx.beanMapper(item)) @@ -756,6 +1033,19 @@ class AwsSnapshotCrawler(val name: String, val ctx: AwsCrawler.Context) extends } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val list = backoffRequest { ctx.awsClient.ec2(true).describeSnapshots(request).getSnapshots }.asScala.map( + item => { + tagCount += item.getTags.size + Record(item.getSnapshotId, new DateTime(item.getStartTime), ctx.beanMapper(item)) + }).toSeq + if (tagCount == 0 && abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for " + name + ", ignoring crawl results") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for all Tags @@ -766,8 +1056,11 @@ class AwsSnapshotCrawler(val name: String, val ctx: AwsCrawler.Context) extends class AwsTagCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new DescribeTagsRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2.describeTags(request).getTags }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2().describeTags(request).getTags }.asScala.map( item => Record(item.getKey + "|" + item.getResourceType + "|" + item.getResourceId, ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2(true).describeTags(request).getTags }.asScala.map( + item => Record(item.getKey + "|" + item.getResourceType + "|" + item.getResourceId, ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for Volumes @@ -779,9 +1072,9 @@ class AwsVolumeCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cr val request = new DescribeVolumesRequest lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 - val list = backoffRequest { ctx.awsClient.ec2.describeVolumes(request).getVolumes }.asScala.map( + val list = backoffRequest { ctx.awsClient.ec2().describeVolumes(request).getVolumes }.asScala.map( item => { tagCount += item.getTags.size Record(item.getVolumeId, new DateTime(item.getCreateTime), ctx.beanMapper(item)) @@ -791,6 +1084,19 @@ class AwsVolumeCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cr } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val list = backoffRequest { ctx.awsClient.ec2(true).describeVolumes(request).getVolumes }.asScala.map( + item => { + tagCount += item.getTags.size + Record(item.getVolumeId, new DateTime(item.getCreateTime), ctx.beanMapper(item)) + }).toSeq + if (tagCount == 0 && abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for " + name + ", ignoring crawl results") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for S3 Buckets @@ -801,8 +1107,15 @@ class AwsVolumeCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cr class AwsBucketCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new ListBucketsRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.s3.listBuckets(request) }.asScala.map( - item => Record(item.getName, new DateTime(item.getCreationDate), ctx.beanMapper(item))).toSeq + private def doNonAssumeCrawl()(implicit req: RequestId) = { + backoffRequest { ctx.awsClient.s3().listBuckets(request) }.asScala.map( + item => Record(item.getName, new DateTime(item.getCreationDate), ctx.beanMapper(item))).toSeq + } + + override def doCrawl()(implicit req: RequestId) = { + backoffRequest { ctx.awsClient.s3(true).listBuckets(request) }.asScala.map( + item => Record(item.getName, new DateTime(item.getCreationDate), ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for IAM Users @@ -815,19 +1128,19 @@ class AwsIamUserCrawler(val name: String, val ctx: AwsCrawler.Context) extends C private[this] val logger = LoggerFactory.getLogger(getClass) private[this] val threadPool = Executors.newFixedThreadPool(10) - override def doCrawl()(implicit req: RequestId) = { - val users = backoffRequest { ctx.awsClient.identitymanagement.listUsers(request).getUsers.asScala } + private def doNonAssumeCrawl()(implicit req: RequestId) = { + val users = backoffRequest { ctx.awsClient.identitymanagement().listUsers(request).getUsers.asScala } val futures: Seq[java.util.concurrent.Future[Record]] = users.map( user => { threadPool.submit( new Callable[Record] { def call() = { val groupsRequest = new ListGroupsForUserRequest().withUserName(user.getUserName) - val groups = backoffRequest { ctx.awsClient.identitymanagement.listGroupsForUser(groupsRequest).getGroups }.asScala.map( item => item.getGroupName ).toSeq + val groups = backoffRequest { ctx.awsClient.identitymanagement().listGroupsForUser(groupsRequest).getGroups }.asScala.map( item => item.getGroupName ).toSeq val accessKeysRequest = new ListAccessKeysRequest().withUserName(user.getUserName) - val accessKeys = Map[String, String]() ++ backoffRequest { ctx.awsClient.identitymanagement.listAccessKeys(accessKeysRequest).getAccessKeyMetadata }.asScala.map(item => ctx.beanMapper(item)).toSeq + val accessKeys = Map[String, String]() ++ backoffRequest { ctx.awsClient.identitymanagement().listAccessKeys(accessKeysRequest).getAccessKeyMetadata }.asScala.map(item => ctx.beanMapper(item)).toSeq val userPoliciesRequest = new ListUserPoliciesRequest().withUserName(user.getUserName) - val userPolicies = backoffRequest { ctx.awsClient.identitymanagement.listUserPolicies(userPoliciesRequest).getPolicyNames.asScala } + val userPolicies = backoffRequest { ctx.awsClient.identitymanagement().listUserPolicies(userPoliciesRequest).getPolicyNames.asScala } Record(user.getUserName, new DateTime(user.getCreateDate), Map("name" -> user.getUserName, "attributes" -> (ctx.beanMapper(user)), "groups" -> groups, "accessKeys" -> accessKeys, "userPolicies" -> userPolicies)) } } @@ -856,6 +1169,47 @@ class AwsIamUserCrawler(val name: String, val ctx: AwsCrawler.Context) extends C records } + override def doCrawl()(implicit req: RequestId) = { + val users = backoffRequest { ctx.awsClient.identitymanagement(true).listUsers(request).getUsers.asScala } + val futures: Seq[java.util.concurrent.Future[Record]] = users.map( + user => { + threadPool.submit( + new Callable[Record] { + def call() = { + val groupsRequest = new ListGroupsForUserRequest().withUserName(user.getUserName) + val groups = backoffRequest { ctx.awsClient.identitymanagement(true).listGroupsForUser(groupsRequest).getGroups }.asScala.map( item => item.getGroupName ).toSeq + val accessKeysRequest = new ListAccessKeysRequest().withUserName(user.getUserName) + val accessKeys = Map[String, String]() ++ backoffRequest { ctx.awsClient.identitymanagement(true).listAccessKeys(accessKeysRequest).getAccessKeyMetadata }.asScala.map(item => ctx.beanMapper(item)).toSeq + val userPoliciesRequest = new ListUserPoliciesRequest().withUserName(user.getUserName) + val userPolicies = backoffRequest { ctx.awsClient.identitymanagement(true).listUserPolicies(userPoliciesRequest).getPolicyNames.asScala } + Record(user.getUserName, new DateTime(user.getCreateDate), Map("name" -> user.getUserName, "attributes" -> (ctx.beanMapper(user)), "groups" -> groups, "accessKeys" -> accessKeys, "userPolicies" -> userPolicies)) + } + } + ) + } + ) + var failed = false + val records = futures.map( + f => { + try Some(f.get) + catch { + case e: Exception => { + if (logger.isErrorEnabled) logger.error(s"$req$this exception from IAM user sub requests", e) + failed = true + None + } + } + } + ).collect { + case Some(rec) => rec + } + + if (failed) { + throw new java.lang.RuntimeException(s"$this failed to crawl resource record sets") + } + records ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } + } /** crawler for IAM Groups @@ -868,15 +1222,15 @@ class AwsIamGroupCrawler(val name: String, val ctx: AwsCrawler.Context) extends private[this] val logger = LoggerFactory.getLogger(getClass) private[this] val threadPool = Executors.newFixedThreadPool(10) - override def doCrawl()(implicit req: RequestId) = { - val groups = backoffRequest { ctx.awsClient.identitymanagement.listGroups(request).getGroups.asScala } + private def doNonAssumeCrawl()(implicit req: RequestId) = { + val groups = backoffRequest { ctx.awsClient.identitymanagement().listGroups(request).getGroups.asScala } val futures: Seq[java.util.concurrent.Future[Record]] = groups.map( group => { threadPool.submit( new Callable[Record] { def call() = { val groupPoliciesRequest = new ListGroupPoliciesRequest().withGroupName(group.getGroupName) - val groupPolicies = backoffRequest { ctx.awsClient.identitymanagement.listGroupPolicies(groupPoliciesRequest).getPolicyNames.asScala.toSeq } + val groupPolicies = backoffRequest { ctx.awsClient.identitymanagement().listGroupPolicies(groupPoliciesRequest).getPolicyNames.asScala.toSeq } Record(group.getGroupName, new DateTime(group.getCreateDate), Map("name" -> group.getGroupName, "attributes" -> (ctx.beanMapper(group)), "policies" -> groupPolicies)) } } @@ -905,6 +1259,43 @@ class AwsIamGroupCrawler(val name: String, val ctx: AwsCrawler.Context) extends records } + override def doCrawl()(implicit req: RequestId) = { + val groups = backoffRequest { ctx.awsClient.identitymanagement(true).listGroups(request).getGroups.asScala } + val futures: Seq[java.util.concurrent.Future[Record]] = groups.map( + group => { + threadPool.submit( + new Callable[Record] { + def call() = { + val groupPoliciesRequest = new ListGroupPoliciesRequest().withGroupName(group.getGroupName) + val groupPolicies = backoffRequest { ctx.awsClient.identitymanagement(true).listGroupPolicies(groupPoliciesRequest).getPolicyNames.asScala.toSeq } + Record(group.getGroupName, new DateTime(group.getCreateDate), Map("name" -> group.getGroupName, "attributes" -> (ctx.beanMapper(group)), "policies" -> groupPolicies)) + } + } + ) + } + ) + var failed = false + val records = futures.map( + f => { + try Some(f.get) + catch { + case e: Exception => { + if (logger.isErrorEnabled) logger.error(s"$req$this exception from IAM listGroupPolicies", e) + failed = true + None + } + } + } + ).collect { + case Some(rec) => rec + } + + if (failed) { + throw new java.lang.RuntimeException(s"$this failed to crawl resource record sets") + } + records ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } + } /** crawler for IAM Roles @@ -915,8 +1306,11 @@ class AwsIamGroupCrawler(val name: String, val ctx: AwsCrawler.Context) extends class AwsIamRoleCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new ListRolesRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.identitymanagement.listRoles(request).getRoles }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.identitymanagement().listRoles(request).getRoles }.asScala.map( item => Record(item.getRoleName, new DateTime(item.getCreateDate), ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.identitymanagement(true).listRoles(request).getRoles }.asScala.map( + item => Record(item.getRoleName, new DateTime(item.getCreateDate), ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for IAM policies @@ -928,18 +1322,31 @@ class AwsIamPolicyCrawler(val name: String, val ctx: AwsCrawler.Context) extends val request = new ListPoliciesRequest val vr = new GetPolicyVersionRequest - override def doCrawl()(implicit req: RequestId) = { - ctx.awsClient.identitymanagement.listPolicies(request).getPolicies.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = { + ctx.awsClient.identitymanagement().listPolicies(request).getPolicies.asScala.map( item => { vr.setPolicyArn(item.getArn) vr.setVersionId(item.getDefaultVersionId) - val version = ctx.awsClient.identitymanagement.getPolicyVersion(vr).getPolicyVersion + val version = ctx.awsClient.identitymanagement().getPolicyVersion(vr).getPolicyVersion val data = ctx.beanMapper(item).asInstanceOf[Map[String, Any]] ++ Map("defaultDocument" -> version.getDocument) Record(item.getPolicyName, new DateTime(item.getUpdateDate), data) }).toSeq } + + override def doCrawl()(implicit req: RequestId) = { + ctx.awsClient.identitymanagement(true).listPolicies(request).getPolicies.asScala.map( + item => { + vr.setPolicyArn(item.getArn) + vr.setVersionId(item.getDefaultVersionId) + val version = ctx.awsClient.identitymanagement(true).getPolicyVersion(vr).getPolicyVersion + + val data = ctx.beanMapper(item).asInstanceOf[Map[String, Any]] ++ Map("defaultDocument" -> version.getDocument) + + Record(item.getPolicyName, new DateTime(item.getUpdateDate), data) + }).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } @@ -949,8 +1356,11 @@ class AwsIamPolicyCrawler(val name: String, val ctx: AwsCrawler.Context) extends class AwsIamPolicyVersionCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new ListPolicyVersionsRequest - override def doCrawl()(implicit req: RequestId) = ctx.awsClient.identitymanagement.listPolicyVersions(request).getVersions.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = ctx.awsClient.identitymanagement().listPolicyVersions(request).getVersions.asScala.map( item => Record(item.getVersionId, new DateTime(item.getCreateDate), ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = ctx.awsClient.identitymanagement(true).listPolicyVersions(request).getVersions.asScala.map( + item => Record(item.getVersionId, new DateTime(item.getCreateDate), ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for IAM Virtual MFA Devices @@ -961,8 +1371,11 @@ class AwsIamPolicyVersionCrawler(val name: String, val ctx: AwsCrawler.Context) class AwsIamVirtualMFADeviceCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new ListVirtualMFADevicesRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.identitymanagement.listVirtualMFADevices(request).getVirtualMFADevices }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.identitymanagement().listVirtualMFADevices(request).getVirtualMFADevices }.asScala.map( item => Record(item.getSerialNumber.split('/').last, new DateTime(item.getEnableDate), ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.identitymanagement(true).listVirtualMFADevices(request).getVirtualMFADevices }.asScala.map( + item => Record(item.getSerialNumber.split('/').last, new DateTime(item.getEnableDate), ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for SQS Queues @@ -978,8 +1391,8 @@ class AwsSimpleQueueCrawler(val name: String, val ctx: AwsCrawler.Context) exten private[this] val logger = LoggerFactory.getLogger(getClass) private[this] val threadPool = Executors.newFixedThreadPool(10) - override def doCrawl()(implicit req: RequestId) = { - val queues = backoffRequest { ctx.awsClient.sqs.listQueues(request).getQueueUrls.asScala } + private def doNonAssumeCrawl()(implicit req: RequestId) = { + val queues = backoffRequest { ctx.awsClient.sqs().listQueues(request).getQueueUrls.asScala } val futures: Seq[java.util.concurrent.Future[Record]] = queues.map( queueUrl => { threadPool.submit( @@ -987,7 +1400,7 @@ class AwsSimpleQueueCrawler(val name: String, val ctx: AwsCrawler.Context) exten def call() = { val name = queueUrl.split('/').last val attrRequest = new GetQueueAttributesRequest().withQueueUrl(queueUrl).withAttributeNames("All") - val attrs = Map[String, String]() ++ backoffRequest { ctx.awsClient.sqs.getQueueAttributes(attrRequest).getAttributes.asScala } + val attrs = Map[String, String]() ++ backoffRequest { ctx.awsClient.sqs().getQueueAttributes(attrRequest).getAttributes.asScala } val ctime = attrs.get("CreatedTimestamp") match { case Some(time) => new DateTime(time.toInt * 1000) case None => DateTime.now @@ -1034,6 +1447,63 @@ class AwsSimpleQueueCrawler(val name: String, val ctx: AwsCrawler.Context) exten } records } + + override def doCrawl()(implicit req: RequestId) = { + val queues = backoffRequest { ctx.awsClient.sqs(true).listQueues(request).getQueueUrls.asScala } + val futures: Seq[java.util.concurrent.Future[Record]] = queues.map( + queueUrl => { + threadPool.submit( + new Callable[Record] { + def call() = { + val name = queueUrl.split('/').last + val attrRequest = new GetQueueAttributesRequest().withQueueUrl(queueUrl).withAttributeNames("All") + val attrs = Map[String, String]() ++ backoffRequest { ctx.awsClient.sqs(true).getQueueAttributes(attrRequest).getAttributes.asScala } + val ctime = attrs.get("CreatedTimestamp") match { + case Some(time) => new DateTime(time.toInt * 1000) + case None => DateTime.now + } + + Record(name, ctime, Map("name" -> name, "url" -> queueUrl, "attributes" -> (attrs))) + } + } + ) + } + ) + var failed: Boolean = false + val records = futures.map( + f => { + try Some(f.get) + catch { + case e: java.util.concurrent.ExecutionException => { + e.getCause match { + case e: AmazonServiceException if e.getErrorCode == "AWS.SimpleQueueService.NonExistentQueue" => { + // this happens constantly, dont log it. There is a large time delta between queues being deleted + // but still showing up in the ListQueuesResult + None + } + case e: Throwable => { + if (logger.isErrorEnabled) logger.error(s"$req$this exception from SQS getQueueAttributes", e) + failed = true + None + } + } + } + case e: Throwable => { + if (logger.isErrorEnabled) logger.error(s"$req$this exception from SQS getQueueAttributes", e) + failed = true + None + } + } + } + ).collect { + case Some(rec) => rec + } + + if (failed) { + throw new java.lang.RuntimeException(s"$this failed to crawl resource record sets") + } + records ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for ReservedInstance (ie pre-paid instances) @@ -1044,8 +1514,11 @@ class AwsSimpleQueueCrawler(val name: String, val ctx: AwsCrawler.Context) exten class AwsReservedInstanceCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new DescribeReservedInstancesRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2.describeReservedInstances(request).getReservedInstances }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2().describeReservedInstances(request).getReservedInstances }.asScala.map( item => Record(item.getReservedInstancesId, new DateTime(item.getStart), ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2(true).describeReservedInstances(request).getReservedInstances }.asScala.map( + item => Record(item.getReservedInstancesId, new DateTime(item.getStart), ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for ReservedInstancesOfferings @@ -1056,8 +1529,11 @@ class AwsReservedInstanceCrawler(val name: String, val ctx: AwsCrawler.Context) class AwsReservedInstancesOfferingCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new DescribeReservedInstancesOfferingsRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2.describeReservedInstancesOfferings(request).getReservedInstancesOfferings }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2().describeReservedInstancesOfferings(request).getReservedInstancesOfferings }.asScala.map( item => Record(item.getReservedInstancesOfferingId, ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.ec2(true).describeReservedInstancesOfferings(request).getReservedInstancesOfferings }.asScala.map( + item => Record(item.getReservedInstancesOfferingId, ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for Route53 Hosted Zones (DNS records) @@ -1068,8 +1544,11 @@ class AwsReservedInstancesOfferingCrawler(val name: String, val ctx: AwsCrawler. class AwsHostedZoneCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new ListHostedZonesRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.route53.listHostedZones(request).getHostedZones }.asScala.map( - item => Record(item.getName, ctx.beanMapper(item))).toSeq + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.route53().listHostedZones(request).getHostedZones }.asScala.map( + item => Record(item.getName, ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.route53(true).listHostedZones(request).getHostedZones }.asScala.map( + item => Record(item.getName, ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } case class AwsHostedRecordCrawlerState(hostedZones: Seq[Record] = Seq[Record]()) @@ -1100,7 +1579,7 @@ class AwsHostedRecordCrawler(val name: String, val ctx: AwsCrawler.Context, val * @param zones the records to crawl * @return the record set for the resourceRecordSet */ - def doCrawl(zones: Seq[Record])(implicit req: RequestId): Seq[Record] = { + private def doNonAssumeCrawl(zones: Seq[Record])(implicit req: RequestId): Seq[Record] = { val futures: Seq[java.util.concurrent.Future[Seq[Record]]] = zones.map( zone => { @@ -1112,7 +1591,7 @@ class AwsHostedRecordCrawler(val name: String, val ctx: AwsCrawler.Context, val def call() = { val it = new AwsIterator() { def next() = { - val response = backoffRequest { ctx.awsClient.route53.listResourceRecordSets(request.withStartRecordName(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.route53().listResourceRecordSets(request.withStartRecordName(this.nextToken.get)) } this.nextToken = Option(response.getNextRecordName) response.getResourceRecordSets.asScala.map( item => { @@ -1149,6 +1628,55 @@ class AwsHostedRecordCrawler(val name: String, val ctx: AwsCrawler.Context, val records } + def doCrawl(zones: Seq[Record])(implicit req: RequestId): Seq[Record] = { + + val futures: Seq[java.util.concurrent.Future[Seq[Record]]] = zones.map( + zone => { + val zoneId = zone.data.asInstanceOf[Map[String,Any]]("id").asInstanceOf[String] + val zoneName = zone.id + val request = new ListResourceRecordSetsRequest(zoneId) + threadPool.submit( + new Callable[Seq[Record]] { + def call() = { + val it = new AwsIterator() { + def next() = { + val response = backoffRequest { ctx.awsClient.route53(true).listResourceRecordSets(request.withStartRecordName(this.nextToken.get)) } + this.nextToken = Option(response.getNextRecordName) + response.getResourceRecordSets.asScala.map( + item => { + Record(item.getName, ctx.beanMapper(item).asInstanceOf[Map[String,Any]] ++ Map("zone" -> Map("id" -> zoneId, "name" -> zoneName))) + } + ).toList + } + } + it.toList.flatten + } + } + ) + } + ) + var failed: Boolean = false + val records = futures.map( + f => { + try Some(f.get) + catch { + case e: Exception => { + failed = true + if (logger.isErrorEnabled) logger.error(s"$req$this exception from listResourceRecordSets", e) + None + } + } + } + ).collect({ + case Some(rec) => rec + }).flatten + + if (failed) { + throw new java.lang.RuntimeException(s"$this failed to crawl resource record sets") + } + records ++ (if(assumeRoleEnabled) doNonAssumeCrawl(zones) else Seq.empty[Record]) + } + protected override def initState = addInitialState(super.initState, newLocalState(AwsHostedRecordCrawlerState())) protected override def init() { @@ -1191,11 +1719,11 @@ class AwsDatabaseCrawler(val name: String, val ctx: AwsCrawler.Context) extends val request = new DescribeDBInstancesRequest request.setMaxRecords(50) - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { val it = new AwsIterator() { def next() = { // annoying, describeDBInstances has withMarker and getMarker instead if withToken and getNextToken - val response = backoffRequest { ctx.awsClient.rds.describeDBInstances(request.withMarker(this.nextToken.get)) } + val response = backoffRequest { ctx.awsClient.rds().describeDBInstances(request.withMarker(this.nextToken.get)) } this.nextToken = Option(response.getMarker) response.getDBInstances.asScala.map( item => { @@ -1209,19 +1737,52 @@ class AwsDatabaseCrawler(val name: String, val ctx: AwsCrawler.Context) extends val initial = it.toList.flatten var buffer = new ListBuffer[Record]() for (rec <- initial) { - val data = rec.toMap("data").asInstanceOf[Map[String,String]] - val arn = ctx.awsClient.arn("rds", "db", data("DBInstanceIdentifier")) + val data = rec.toMap("data").asInstanceOf[Map[String,String]] + val arn = ctx.awsClient.arn("rds", "db", data("DBInstanceIdentifier")) + + val request = new ListTagsForResourceRequest().withResourceName(arn) + val response = backoffRequest { ctx.awsClient.rds().listTagsForResource(request) } + val responseList = response.getTagList.asScala.map( + item => { + ctx.beanMapper(item) + }).toList + + buffer += rec.copy(data = data.asInstanceOf[Map[String,Any]] ++ Map("arn" -> arn, "tags" -> responseList)) + } + buffer.toList + } - val request = new ListTagsForResourceRequest().withResourceName(arn) - val response = backoffRequest { ctx.awsClient.rds.listTagsForResource(request) } - val responseList = response.getTagList.asScala.map( + override def doCrawl()(implicit req: RequestId) = { + val it = new AwsIterator() { + def next() = { + // annoying, describeDBInstances has withMarker and getMarker instead if withToken and getNextToken + val response = backoffRequest { ctx.awsClient.rds(true).describeDBInstances(request.withMarker(this.nextToken.get)) } + this.nextToken = Option(response.getMarker) + response.getDBInstances.asScala.map( item => { - ctx.beanMapper(item) + Record(item.getDBInstanceIdentifier, ctx.beanMapper(item)) }).toList + } + } + + backoffRequest { ctx.awsClient.loadAccountNum() } + + val initial = it.toList.flatten + var buffer = new ListBuffer[Record]() + for (rec <- initial) { + val data = rec.toMap("data").asInstanceOf[Map[String,String]] + val arn = ctx.awsClient.arn("rds", "db", data("DBInstanceIdentifier")) + + val request = new ListTagsForResourceRequest().withResourceName(arn) + val response = backoffRequest { ctx.awsClient.rds(true).listTagsForResource(request) } + val responseList = response.getTagList.asScala.map( + item => { + ctx.beanMapper(item) + }).toList - buffer += rec.copy(data = data.asInstanceOf[Map[String,Any]] ++ Map("arn" -> arn, "tags" -> responseList)) + buffer += rec.copy(data = data.asInstanceOf[Map[String,Any]] ++ Map("arn" -> arn, "tags" -> responseList)) } - buffer.toList + buffer.toList ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } } @@ -1231,11 +1792,17 @@ class AwsDatabaseCrawler(val name: String, val ctx: AwsCrawler.Context) extends * @param ctx context to provide beanMapper */ class AwsDatabaseSubnetCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { - override def doCrawl()(implicit req: RequestId) = - ctx.awsClient.rds.describeDBSubnetGroups().getDBSubnetGroups.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = + ctx.awsClient.rds().describeDBSubnetGroups().getDBSubnetGroups.asScala.map( item => { Record(item.getDBSubnetGroupName, ctx.beanMapper(item)) }) + + override def doCrawl()(implicit req: RequestId) = + ctx.awsClient.rds(true).describeDBSubnetGroups().getDBSubnetGroups.asScala.map( + item => { + Record(item.getDBSubnetGroupName, ctx.beanMapper(item)) + }) ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for ElastiCache Clusters @@ -1246,8 +1813,11 @@ class AwsDatabaseSubnetCrawler(val name: String, val ctx: AwsCrawler.Context) ex class AwsCacheClusterCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler { val request = new DescribeCacheClustersRequest - override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.elasticache.describeCacheClusters(request).getCacheClusters }.asScala.map( + private def doNonAssumeCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.elasticache().describeCacheClusters(request).getCacheClusters }.asScala.map( item => Record(item.getCacheClusterId, ctx.beanMapper(item))).toSeq + + override def doCrawl()(implicit req: RequestId) = backoffRequest { ctx.awsClient.elasticache(true).describeCacheClusters(request).getCacheClusters }.asScala.map( + item => Record(item.getCacheClusterId, ctx.beanMapper(item))).toSeq ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) } /** crawler for Subnets @@ -1259,9 +1829,9 @@ class AwsSubnetCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cr val request = new DescribeSubnetsRequest lazy val abortWithoutTags = Utils.getProperty("edda.crawler", "abortWithoutTags", name, "false") - override def doCrawl()(implicit req: RequestId) = { + private def doNonAssumeCrawl()(implicit req: RequestId) = { var tagCount = 0 - val list = backoffRequest { ctx.awsClient.ec2.describeSubnets(request).getSubnets }.asScala.map( + val list = backoffRequest { ctx.awsClient.ec2().describeSubnets(request).getSubnets }.asScala.map( item => { tagCount += item.getTags.size Record(item.getSubnetId, ctx.beanMapper(item)) @@ -1271,6 +1841,19 @@ class AwsSubnetCrawler(val name: String, val ctx: AwsCrawler.Context) extends Cr } list } + + override def doCrawl()(implicit req: RequestId) = { + var tagCount = 0 + val list = backoffRequest { ctx.awsClient.ec2(true).describeSubnets(request).getSubnets }.asScala.map( + item => { + tagCount += item.getTags.size + Record(item.getSubnetId, ctx.beanMapper(item)) + }).toSeq + if (tagCount == 0 && abortWithoutTags.get.toBoolean) { + throw new java.lang.RuntimeException("no tags found for " + name + ", ignoring crawl results") + } + list ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } } /** crawler for Cloudformation Stacks @@ -1282,13 +1865,13 @@ class AwsCloudformationCrawler(val name: String, val ctx: AwsCrawler.Context) ex private[this] val logger = LoggerFactory.getLogger(getClass) private[this] val threadPool = Executors.newFixedThreadPool(1) - private def getStacksFromAws: List[Stack] = { + private def getStacksFromAws(client : AmazonCloudFormationClient): List[Stack] = { val stacks = List.newBuilder[Stack] val request = new DescribeStacksRequest var token: String = null do { val response = backoffRequest { - ctx.awsClient.cloudformation.describeStacks(request.withNextToken(token)) + client.describeStacks(request.withNextToken(token)) } stacks ++= response.getStacks.asScala token = response.getNextToken @@ -1296,15 +1879,15 @@ class AwsCloudformationCrawler(val name: String, val ctx: AwsCrawler.Context) ex stacks.result } - override def doCrawl()(implicit req: RequestId) = { - val stacks = getStacksFromAws + private def doNonAssumeCrawl()(implicit req: RequestId) = { + val stacks = getStacksFromAws(ctx.awsClient.cloudformation()) val futures: Seq[java.util.concurrent.Future[Record]] = stacks.map( stack => { this.threadPool.submit( new Callable[Record] { def call() = { val stackResourcesRequest = new ListStackResourcesRequest().withStackName(stack.getStackName) - val stackResources = backoffRequest { ctx.awsClient.cloudformation.listStackResources(stackResourcesRequest).getStackResourceSummaries.asScala.map(item => ctx.beanMapper(item)) } + val stackResources = backoffRequest { ctx.awsClient.cloudformation().listStackResources(stackResourcesRequest).getStackResourceSummaries.asScala.map(item => ctx.beanMapper(item)) } Record(stack.getStackName, new DateTime(stack.getCreationTime), ctx.beanMapper(stack).asInstanceOf[Map[String,Any]] ++ Map("resources" -> stackResources)) } } @@ -1327,4 +1910,35 @@ class AwsCloudformationCrawler(val name: String, val ctx: AwsCrawler.Context) ex records } -} + override def doCrawl()(implicit req: RequestId) = { + val stacks = getStacksFromAws(ctx.awsClient.cloudformation(true)) + val futures: Seq[java.util.concurrent.Future[Record]] = stacks.map( + stack => { + this.threadPool.submit( + new Callable[Record] { + def call() = { + val stackResourcesRequest = new ListStackResourcesRequest().withStackName(stack.getStackName) + val stackResources = backoffRequest { ctx.awsClient.cloudformation(true).listStackResources(stackResourcesRequest).getStackResourceSummaries.asScala.map(item => ctx.beanMapper(item)) } + Record(stack.getStackName, new DateTime(stack.getCreationTime), ctx.beanMapper(stack).asInstanceOf[Map[String,Any]] ++ Map("resources" -> stackResources)) + } + } + ) + } + ) + val records = futures.map( + f => { + try Some(f.get) + catch { + case e: Exception => { + if (logger.isErrorEnabled) logger.error(this + "exception from Cloudformation listStackResources", e) + None + } + } + } + ).collect { + case Some(rec) => rec + } + records ++ (if(assumeRoleEnabled) doNonAssumeCrawl() else Seq.empty[Record]) + } + +} \ No newline at end of file diff --git a/src/main/scala/com/netflix/edda/aws/DynamoDBElector.scala b/src/main/scala/com/netflix/edda/aws/DynamoDBElector.scala index d76e66ee..dfe96b8f 100644 --- a/src/main/scala/com/netflix/edda/aws/DynamoDBElector.scala +++ b/src/main/scala/com/netflix/edda/aws/DynamoDBElector.scala @@ -44,9 +44,9 @@ class DynamoDBElector extends Elector { private var inited = false - val readDynamo = new AwsClient(account).dynamo + val readDynamo = new AwsClient(account).dynamo(true) val writeDynamo = { - val client = new AwsClient(account).dynamo + val client = new AwsClient(account).dynamo(true) client } diff --git a/src/main/scala/com/netflix/edda/aws/S3CurrentDatastore.scala b/src/main/scala/com/netflix/edda/aws/S3CurrentDatastore.scala index 60730b07..bbe3c48d 100644 --- a/src/main/scala/com/netflix/edda/aws/S3CurrentDatastore.scala +++ b/src/main/scala/com/netflix/edda/aws/S3CurrentDatastore.scala @@ -55,14 +55,14 @@ class S3CurrentDatastore(val name: String) extends Datastore { } } - lazy val s3 = new AwsClient(account).s3 - val readDynamo = new AwsClient(account).dynamo + lazy val s3 = new AwsClient(account).s3(true) + val readDynamo = new AwsClient(account).dynamo(true) // disable retry's when writing to dynamo ... if initial request // gets a timeout we need to know as it will likely complete eventually // and then all subsequent conditional updates will fail since will be out // of sync with the datastore val writeDynamo = { - val client = new AwsClient(account).dynamo + val client = new AwsClient(account).dynamo(true) client }