Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly marked private code with underscore prefixes. #642

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions baseplate/clients/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ class CassandraClient(config.Parser):
"""

def __init__(self, keyspace: str, **kwargs: Any):
self.keyspace = keyspace
self.kwargs = kwargs
self._keyspace = keyspace
self._kwargs = kwargs

def parse(self, key_path: str, raw_config: config.RawConfig) -> "CassandraContextFactory":
cluster = cluster_from_config(raw_config, prefix=f"{key_path}.", **self.kwargs)
session = cluster.connect(keyspace=self.keyspace)
cluster = cluster_from_config(raw_config, prefix=f"{key_path}.", **self._kwargs)
session = cluster.connect(keyspace=self._keyspace)
return CassandraContextFactory(session)


Expand All @@ -125,11 +125,11 @@ class CassandraContextFactory(ContextFactory):
"""

def __init__(self, session: Session):
self.session = session
self.prepared_statements: Dict[str, PreparedStatement] = {}
self._session = session
self._prepared_statements: Dict[str, PreparedStatement] = {}

def make_object_for_context(self, name: str, span: Span) -> "CassandraSessionAdapter":
return CassandraSessionAdapter(name, span, self.session, self.prepared_statements)
return CassandraSessionAdapter(name, span, self._session, self._prepared_statements)


class CQLMapperClient(config.Parser):
Expand All @@ -145,12 +145,12 @@ class CQLMapperClient(config.Parser):
"""

def __init__(self, keyspace: str, **kwargs: Any):
self.keyspace = keyspace
self.kwargs = kwargs
self._keyspace = keyspace
self._kwargs = kwargs

def parse(self, key_path: str, raw_config: config.RawConfig) -> "CQLMapperContextFactory":
cluster = cluster_from_config(raw_config, prefix=f"{key_path}.", **self.kwargs)
session = cluster.connect(keyspace=self.keyspace)
cluster = cluster_from_config(raw_config, prefix=f"{key_path}.", **self._kwargs)
session = cluster.connect(keyspace=self._keyspace)
return CQLMapperContextFactory(session)


Expand Down Expand Up @@ -261,13 +261,13 @@ def __init__(
session: Session,
prepared_statements: Dict[str, PreparedStatement],
):
self.context_name = context_name
self.server_span = server_span
self.session = session
self.prepared_statements = prepared_statements
self._context_name = context_name
self._server_span = server_span
self._session = session
self._prepared_statements = prepared_statements

def __getattr__(self, name: str) -> Any:
return getattr(self.session, name)
return getattr(self._session, name)

def execute(
self,
Expand All @@ -285,8 +285,8 @@ def execute_async(
timeout: Union[float, object] = _NOT_SET,
**kwargs: Any,
) -> ResponseFuture:
trace_name = f"{self.context_name}.execute"
span = self.server_span.make_child(trace_name)
trace_name = f"{self._context_name}.execute"
span = self._server_span.make_child(trace_name)
span.start()
# TODO: include custom payload
if isinstance(query, str):
Expand All @@ -295,7 +295,9 @@ def execute_async(
span.set_tag("statement", query.query_string)
elif isinstance(query, BoundStatement):
span.set_tag("statement", query.prepared_statement.query_string)
future = self.session.execute_async(query, parameters=parameters, timeout=timeout, **kwargs)
future = self._session.execute_async(
query, parameters=parameters, timeout=timeout, **kwargs
)
future = wrap_future(
response_future=future,
callback_fn=_on_execute_complete,
Expand All @@ -317,14 +319,14 @@ def prepare(self, query: str, cache: bool = True) -> PreparedStatement:
"""
if cache:
try:
return self.prepared_statements[query]
return self._prepared_statements[query]
except KeyError:
pass

trace_name = f"{self.context_name}.prepare"
with self.server_span.make_child(trace_name) as span:
trace_name = f"{self._context_name}.prepare"
with self._server_span.make_child(trace_name) as span:
span.set_tag("statement", query)
prepared = self.session.prepare(query)
prepared = self._session.prepare(query)
if cache:
self.prepared_statements[query] = prepared
self._prepared_statements[query] = prepared
return prepared
65 changes: 36 additions & 29 deletions baseplate/clients/kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,20 @@ def __init__(
thrift_class: Type[T],
protocol_factory: TProtocolFactory = TBinaryProtocolAcceleratedFactory(),
):
self.thrift_class = thrift_class
self.factory = protocol_factory
self._thrift_class = thrift_class
self._factory = protocol_factory

@property
def name(self) -> str:
return f"thrift-{self.thrift_class.__name__}"
return f"thrift-{self._thrift_class.__name__}"

def serialize(self, obj: T) -> bytes:
if not isinstance(obj, self.thrift_class):
raise TypeError(f"object to serialize must be of {self.thrift_class.__name__} type")
return TSerialization.serialize(obj, self.factory)
if not isinstance(obj, self._thrift_class):
raise TypeError(f"object to serialize must be of {self._thrift_class.__name__} type")
return TSerialization.serialize(obj, self._factory)

def deserialize(self, message: bytes) -> T:
return TSerialization.deserialize(self.thrift_class(), message, self.factory)
return TSerialization.deserialize(self._thrift_class(), message, self._factory)


def register_serializer(serializer: KombuSerializer) -> None:
Expand Down Expand Up @@ -171,15 +171,17 @@ def __init__(
serializer: Optional[KombuSerializer] = None,
secrets: Optional[SecretsStore] = None,
):
self.max_connections = max_connections
self.serializer = serializer
self.secrets = secrets
self._max_connections = max_connections
self._serializer = serializer
self._secrets = secrets

def parse(self, key_path: str, raw_config: config.RawConfig) -> "KombuProducerContextFactory":
connection = connection_from_config(raw_config, prefix=f"{key_path}.", secrets=self.secrets)
connection = connection_from_config(
raw_config, prefix=f"{key_path}.", secrets=self._secrets
)
exchange = exchange_from_config(raw_config, prefix=f"{key_path}.")
return KombuProducerContextFactory(
connection, exchange, max_connections=self.max_connections, serializer=self.serializer
connection, exchange, max_connections=self._max_connections, serializer=self._serializer
)


Expand All @@ -205,14 +207,19 @@ def __init__(
max_connections: Optional[int] = None,
serializer: Optional[KombuSerializer] = None,
):
self.connection = connection
self.exchange = exchange
self.producers = Producers(limit=max_connections)
self.serializer = serializer
self._connection = connection
self._exchange = exchange
self._producers = Producers(limit=max_connections)
self._serializer = serializer

def make_object_for_context(self, name: str, span: Span) -> "_KombuProducer":
return _KombuProducer(
name, span, self.connection, self.exchange, self.producers, serializer=self.serializer
name,
span,
self._connection,
self._exchange,
self._producers,
serializer=self._serializer,
)


Expand All @@ -226,27 +233,27 @@ def __init__(
producers: Producers,
serializer: Optional[KombuSerializer] = None,
):
self.name = name
self.span = span
self.connection = connection
self.exchange = exchange
self.producers = producers
self.serializer = serializer
self._name = name
self._span = span
self._connection = connection
self._exchange = exchange
self._producers = producers
self._serializer = serializer

def publish(self, body: Any, routing_key: Optional[str] = None, **kwargs: Any) -> Any:
if self.serializer:
kwargs.setdefault("serializer", self.serializer.name)
if self._serializer:
kwargs.setdefault("serializer", self._serializer.name)

trace_name = f"{self.name}.publish"
child_span = self.span.make_child(trace_name)
trace_name = f"{self._name}.publish"
child_span = self._span.make_child(trace_name)

child_span.set_tag("kind", "producer")
if routing_key:
child_span.set_tag("message_bus.destination", routing_key)

with child_span:
producer_pool = self.producers[self.connection]
producer_pool = self._producers[self._connection]
with producer_pool.acquire(block=True) as producer:
return producer.publish(
body=body, routing_key=routing_key, exchange=self.exchange, **kwargs
body=body, routing_key=routing_key, exchange=self._exchange, **kwargs
)
Loading