Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#531 Add the ability to restrict domains of emails that can be used as email recipients #532

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,59 @@ Let's take a look on components of a data pipeline in more detail.

## Pipeline components

A pipeline consists of _sources_, _the metastore_ and _sinks_.
A pipeline consists of _common options_, _sources_, _the metastore_, _sinks_, and _operations_. All these
definitions form the workflow config. For big pipelines these definitions can be split among multiple files. Check out
`examples/` folder for example workflow definitions. Let's take a look at each section of a workflow separately.

Currently there are 3 types of jobs:
- _Ingestion_ jobs to get data from external sources to the metastore.
- _Transformation jobs_ to transform data inside the metastore.
- _Sink_ jobs to send data from the metastore to external systems.

### Common options
Pramen pipeline should have several options defined. Here is the minimum configuration. For the list of all options
and their default values check out [reference.conf](pramen/core/src/main/resources/reference.conf).

```hocon
pramen {
environment.name = "AWS Glue (DEV)"
pipeline.name = "CDC PoC"

bookkeeping.enabled = true
bookkeeping.jdbc {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://myhost:5432/pramen_database"
user = "postgresql_user"
password = "password"
}
temporary.directory = "s3://bucket/prefix/tmp/"
}
```

#### Email notifications
One section of config defines options for email notifications. You can define
```hocon
mail {
# SMTP configuration
# Any options from https://javaee.github.io/javamail/docs/api/com/sun/mail/smtp/package-summary.html
smtp.host = "smtp.example.com"
smtp.port = "25"
smtp.auth = "false"
smtp.starttls.enable = "false"
smtp.EnableSSL.enable = "false"
debug = "false"

# A custom email sender (optional)
send.from = "Pramen <[email protected]"

# Email recipients
send.to = "[email protected], [email protected]"

# A list of allowed domains (optional)
allowed.domains = [ "example.com", "test.com" ]
}
```

### Dates
Before diving into pipeline definition it is important to understand how dates are handled. Pramen is a batch data
pipeline manager for input data updates coming from applications which are usually referred to as _source systems_. Pramen is designed
Expand Down
3 changes: 3 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ mail {

send.from = "Pramen <[email protected]>"
send.to = ""

# The list of allowed domains as email targets. If empty, all valid email addresses are allowed
allowed.domains = []
}

hadoop.redacted.tokens = [ password, secret, session.token, access.key ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object Keys {
val MAIL_FROM = "mail.send.from"
val MAIL_TO = "mail.send.to"
val MAIL_FAILURES_TO = "mail.send.failures.to"
val MAIL_ALLOWED_DOMAINS = "mail.allowed.domains"

val HADOOP_REDACT_TOKENS = "hadoop.redacted.tokens"
val HADOOP_OPTION_PREFIX = "hadoop.option"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.notify
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.notification.NotificationEntry
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.Emoji._

Expand All @@ -28,16 +29,6 @@ import javax.mail.{Message, Session, Transport}
import scala.util.control.NonFatal

object Sendable {
// Configuration keys
val MAIL_SEND_FROM_KEY = "mail.send.from"
val MAIL_SEND_TO_KEY = "mail.send.to"

val MAIL_SMTP_HOST_KEY = "mail.smtp.host"
val MAIL_SMTP_PORT_KEY = "mail.smtp.port"
val MAIL_SMTP_AUTH_KEY = "mail.smtp.auth"
val MAIL_SMTP_LOCALHOST = "mail.smtp.localhost"
val MAIL_SMTP_STARTTLS_ENABLE_KEY = "mail.smtp.starttls.enable"
val MAIL_SMTP_SSL_ENABLE_KEY = "mail.smtp.EnableSSL.enable"
val MAIL_DEBUG_KEY = "mail.debug"
}

Expand All @@ -52,9 +43,9 @@ trait Sendable {

def getFormat: String = "html"

def getFrom: String = getConfig.getString(MAIL_SEND_FROM_KEY)
def getFrom: String

def getTo: String = getConfig.getString(MAIL_SEND_TO_KEY)
def getTo: String

def getSubject: String

Expand All @@ -63,10 +54,10 @@ trait Sendable {
def getFiles: Seq[NotificationEntry.AttachedFile] = Seq()

final def send(): Unit = {
if (getConfig.hasPath(MAIL_SEND_TO_KEY) && getConfig.getString(MAIL_SEND_TO_KEY).trim.nonEmpty) {
if (getTo.nonEmpty) {
doSend()
} else {
log.info(s"No senders are configured at ($MAIL_SEND_TO_KEY). The notification email won't be sent.")
log.info(s"No senders are configured at ('${Keys.MAIL_TO}', '${Keys.MAIL_FAILURES_TO}'). The notification email won't be sent.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ trait PipelineNotificationBuilder {
def addCustomEntries(entries: Seq[NotificationEntry]): Unit

def addSignature(signature: TextElement*): Unit

def addValidatedEmails(validatedEmails: ValidatedEmails): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
var isDryRun = false
var isUndercover = false
var customSignature = Seq.empty[TextElement]
var validatedEmailsOpt: Option[ValidatedEmails] = None

val completedTasks = new ListBuffer[TaskResult]
val pipelineNotificationFailures = new ListBuffer[PipelineNotificationFailure]
Expand Down Expand Up @@ -124,6 +125,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

override def addSignature(signature: TextElement*): Unit = customSignature = signature

override def addValidatedEmails(validatedEmailsIn: ValidatedEmails): Unit = validatedEmailsOpt = Option(validatedEmailsIn)

def renderSubject(): String = {
val timeCreatedStr = ZonedDateTime.now(zoneId).format(timestampFmt)

Expand All @@ -142,6 +145,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

renderHeader(builder)

renderValidatedEmails(builder)

renderCompletedTasks(builder)

val allSchemaChanges = completedTasks
Expand Down Expand Up @@ -776,6 +781,26 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
}
}

private def renderValidatedEmails(builder: MessageBuilderHtml): Unit = {
validatedEmailsOpt.foreach { validatedEmails =>
if (validatedEmails.invalidFormatEmails.nonEmpty) {
val emailText = TextElement(validatedEmails.invalidFormatEmails.mkString(", "), Style.Error)
builder.withParagraph(Seq(
TextElement("Warning! ", Style.Error),
TextElement(s"Some email recipients are not proper emails: ", Style.Exception)
) :+ emailText)
}

if (validatedEmails.invalidDomainEmails.nonEmpty) {
val emailText = TextElement(validatedEmails.invalidDomainEmails.mkString(", "), Style.Error)
builder.withParagraph(Seq(
TextElement("Warning! ", Style.Error),
TextElement(s"Some email recipients have domain names that are not allowed: ", Style.Exception)
) :+ emailText)
}
}
}

private[core] def getTransientTextStyle(task: TaskResult): Style = {
if (task.isTransient)
Style.Italic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ object PipelineNotificationDirector {
* Apply the builder steps in order to create a formatted notification.
*/
def build(notificationBuilder: PipelineNotificationBuilder,
notification: PipelineNotification)
notification: PipelineNotification,
validatedEmails: ValidatedEmails)
(implicit conf: Config): PipelineNotificationBuilder = {
val minRps = conf.getInt(Keys.WARN_THROUGHPUT_RPS)
val goodRps = conf.getInt(Keys.GOOD_THROUGHPUT_RPS)
Expand Down Expand Up @@ -56,6 +57,7 @@ object PipelineNotificationDirector {

notificationBuilder.addCustomEntries(notification.customEntries)
notificationBuilder.addSignature(notification.customSignature: _*)
notificationBuilder.addValidatedEmails(validatedEmails)

notificationBuilder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,80 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.notification.NotificationEntry
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.notify.Sendable
import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji}

import scala.collection.mutable.ListBuffer

class PipelineNotificationEmail(notification: PipelineNotification)
(implicit conf: Config) extends Sendable {

import PipelineNotificationEmail._

private val log = LoggerFactory.getLogger(this.getClass)

private val notificationBuilder = new PipelineNotificationBuilderHtml
private val allowedDomains = getDomainList(ConfigUtils.getOptListStrings(conf, Keys.MAIL_ALLOWED_DOMAINS))

PipelineNotificationDirector.build(notificationBuilder, notification)
private lazy val validatedEmails = {
val validatedEmails = validateRecipientEmails(getEmailRecipients, allowedDomains)

def getSubject: String = {
notificationBuilder.renderSubject()
validatedEmails.invalidFormatEmails.foreach(email =>
log.error(s"${Emoji.FAILURE} Invalid email format: $email")
)

validatedEmails.invalidDomainEmails.foreach(email =>
log.error(s"${Emoji.FAILURE} Invalid email domain: $email")
)

if (validatedEmails.invalidFormatEmails.nonEmpty || validatedEmails.invalidDomainEmails.nonEmpty) {
if (validatedEmails.validEmails.nonEmpty) {
log.warn(s"${Emoji.WARNING} Sending the notification to valid emails only: ${validatedEmails.validEmails.mkString(", ")}")
} else {
log.error(s"${Emoji.FAILURE} No valid emails found. The notification will not be sent.")
}
}

validatedEmails
}

private lazy val notificationBuilder = {
val builder = new PipelineNotificationBuilderHtml

PipelineNotificationDirector.build(builder, notification, validatedEmails)
builder
}

override def getConfig: Config = conf

override def getFrom: String = conf.getString(Keys.MAIL_FROM)

override def getTo: String = {
validatedEmails.validEmails.mkString(", ")
}

override def getSubject: String = {
notificationBuilder.renderSubject()
}

override def getBody: String = {
notificationBuilder.renderBody()
}

override def getFiles: Seq[NotificationEntry.AttachedFile] = {
notificationBuilder.customEntries.flatMap {
case NotificationEntry.AttachedFile(name, content) => Some(NotificationEntry.AttachedFile(name, content))
case _ => None
}.toSeq
}

private[core] def getEmailRecipients: String = {
if (conf.hasPath(Keys.MAIL_FAILURES_TO)) {
if (notification.exception.isDefined || notification.tasksCompleted.exists(t => t.runStatus.isFailure)) {
val to = conf.getString(Keys.MAIL_FAILURES_TO)
log.warn(s"Sending failures to the special mail list: $to")
to
} else {
val to = conf.getString(Keys.MAIL_TO)
log.warn(s"Sending success to the normal mail list: $to")
log.info(s"Sending success to the normal mail list: $to")
to
}
} else {
Expand All @@ -55,15 +103,56 @@ class PipelineNotificationEmail(notification: PipelineNotification)
to
}
}
}

override def getBody: String = {
notificationBuilder.renderBody()
object PipelineNotificationEmail {
def validateRecipientEmails(emails: String, allowedDomains: Seq[String]): ValidatedEmails = {
val allEmails = splitEmails(emails)

val validEmails = new ListBuffer[String]
val invalidFormatEmails = new ListBuffer[String]
val invalidDomainEmails = new ListBuffer[String]

allEmails.foreach { email =>
if (isEmailProperlyFormed(email)) {
if (isEmailDomainAllowed(email, allowedDomains)) {
validEmails += email
} else {
invalidDomainEmails += email
}
} else {
invalidFormatEmails += email
}
}

ValidatedEmails(validEmails.toList, invalidFormatEmails.toList, invalidDomainEmails.toList)
}

override def getFiles: Seq[NotificationEntry.AttachedFile] = {
notificationBuilder.customEntries.flatMap {
case NotificationEntry.AttachedFile(name, content) => Some(NotificationEntry.AttachedFile(name, content))
case _ => None
}.toSeq
private[core] def getDomainList(domains: Seq[String]): Seq[String] = {
domains.map(_.toLowerCase().trim())
.map { domain =>
if (domain.startsWith("@"))
domain.tail
else
domain
}
}

private[core] def splitEmails(emails: String): Seq[String] = {
emails.split("[,;]").map(_.trim).filter(_.nonEmpty)
}

private[core] def isEmailProperlyFormed(email: String): Boolean = {
val emailRegex = "^[^@]+@[^@]+$".r
emailRegex.findFirstMatchIn(email).isDefined
}

private[core] def isEmailDomainAllowed(email: String, allowedDomains: Seq[String]): Boolean = {
if (allowedDomains.isEmpty) {
true
} else {
val domain = email.split("@").last.toLowerCase
allowedDomains.contains(domain)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.notify.pipeline

case class ValidatedEmails(
validEmails: Seq[String],
invalidFormatEmails: Seq[String],
invalidDomainEmails: Seq[String]
)
1 change: 1 addition & 0 deletions pramen/core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ log4j.appender.console.Threshold=ERROR
log4j.logger.za.co.absa.pramen.core.mocks.DummyProcessRunner=OFF
log4j.logger.za.co.absa.pramen.core.mocks.job.JobBaseDummy=OFF
log4j.logger.za.co.absa.pramen.core.notify.SyncNotificationEmail=OFF
log4j.logger.za.co.absa.pramen.core.notify.pipeline.PipelineNotificationEmail=OFF
log4j.logger.za.co.absa.pramen.core.reader.TableReaderJdbc=OFF
log4j.logger.za.co.absa.pramen.core.runner.AppRunner$=OFF
log4j.logger.za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl=OFF
Expand Down
2 changes: 2 additions & 0 deletions pramen/core/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ logger.jobbasedummy.level = OFF

logger.syncnotificationemail.name = za.co.absa.pramen.core.notify.SyncNotificationEmail
logger.syncnotificationemail.level = OFF
logger.pipelinenotificationemail.name=za.co.absa.pramen.core.notify.pipeline.PipelineNotificationEmail
logger.pipelinenotificationemail.level=OFF

logger.tablereaderjdbc.name = za.co.absa.pramen.core.reader.TableReaderJdbc
logger.tablereaderjdbc.level = OFF
Expand Down
Loading
Loading