diff --git a/nuclio/triggers.py b/nuclio/triggers.py index a4eed84..505f39b 100644 --- a/nuclio/triggers.py +++ b/nuclio/triggers.py @@ -173,6 +173,10 @@ def __init__( initial_offset="earliest", explicit_ack_mode=None, extra_attributes=None, + session_timeout: str = "10s", + heartbeat_interval: str = "3s", + worker_allocation_mode: str = "pool", + fetch_default: int = 1048576, ): super(KafkaTrigger, self).__init__( { @@ -183,10 +187,10 @@ def __init__( "brokers": brokers, "consumerGroup": consumer_group, "initialOffset": initial_offset, - "sessionTimeout": "10s", - "heartbeatInterval": "3s", - "workerAllocationMode": "pool", - "fetchDefault": 1048576, + "sessionTimeout": session_timeout, + "heartbeatInterval": heartbeat_interval, + "workerAllocationMode": worker_allocation_mode, + "fetchDefault": fetch_default, }, } ) @@ -195,6 +199,12 @@ def __init__( self._struct["attributes"]["partitions"] = partitions if explicit_ack_mode: self._struct["explicitAckMode"] = explicit_ack_mode + # workerAllocationMode conflicts with explicit_ack_mode, so we should force static one in that case + if not extra_attributes: + extra_attributes = {"workerAllocationMode": "static"} + else: + extra_attributes["workerAllocationMode"] = "static" + self._add_extra_attrs(extra_attributes) def sasl(self, user="", password=""): @@ -266,6 +276,12 @@ def __init__( struct["attributes"]["pollingIntervalMs"] = pollingIntervalMS if explicit_ack_mode: struct["explicitAckMode"] = explicit_ack_mode + # workerAllocationMode conflicts with explicit_ack_mode, so we should force static one in that case + if not extra_attributes: + extra_attributes = {"workerAllocationMode": "static"} + else: + extra_attributes["workerAllocationMode"] = "static" + access_key = access_key if access_key else environ.get("V3IO_ACCESS_KEY") if not access_key: raise ValueError(