forked from dapr/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
117 lines (103 loc) · 4.63 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding: utf-8 -*-
"""
Copyright (c) Microsoft Corporation and Dapr Contributors.
Licensed under the MIT License.
"""
from typing import Callable, Dict, List, Optional, Union
from dapr.clients.base import DaprActorClientBase
from dapr.clients.exceptions import DaprInternalError, ERROR_CODE_UNKNOWN
from dapr.clients.grpc.client import DaprGrpcClient, MetadataTuple, InvokeMethodResponse
from dapr.clients.http.dapr_actor_http_client import DaprActorHttpClient
from dapr.clients.http.dapr_invocation_http_client import DaprInvocationHttpClient
from dapr.conf import settings
from google.protobuf.message import Message as GrpcMessage
__all__ = [
'DaprClient',
'DaprActorClientBase',
'DaprActorHttpClient',
'DaprInternalError',
'ERROR_CODE_UNKNOWN',
]
from grpc import ( # type: ignore
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor,
StreamStreamClientInterceptor
)
class DaprClient(DaprGrpcClient):
"""The Dapr python-sdk uses gRPC for most operations. The exception being
service invocation which needs to support HTTP to HTTP invocations. The sdk defaults
to HTTP but can be overridden with the DAPR_API_METHOD_INVOCATION_PROTOCOL environment
variable. See: https://github.com/dapr/python-sdk/issues/176 for more details"""
def __init__(
self,
address: Optional[str] = None,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
interceptors: Optional[List[Union[
UnaryUnaryClientInterceptor,
UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor,
StreamStreamClientInterceptor]]] = None,
http_timeout_seconds: Optional[int] = None):
"""Connects to Dapr Runtime and via gRPC and HTTP.
Args:
address (str, optional): Dapr Runtime gRPC endpoint address.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
interceptors (list of UnaryUnaryClientInterceptor or
UnaryStreamClientInterceptor or
StreamUnaryClientInterceptor or
StreamStreamClientInterceptor, optional): gRPC interceptors.
http_timeout_seconds (int): specify a timeout for http connections
"""
super().__init__(address, interceptors)
self.invocation_client = None
invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
if invocation_protocol == 'HTTP':
if http_timeout_seconds is None:
http_timeout_seconds = settings.DAPR_HTTP_TIMEOUT_SECONDS
self.invocation_client = DaprInvocationHttpClient(headers_callback=headers_callback,
timeout=http_timeout_seconds)
elif invocation_protocol == 'GRPC':
pass
else:
raise DaprInternalError(
f'Unknown value for DAPR_API_METHOD_INVOCATION_PROTOCOL: {invocation_protocol}')
def invoke_method(
self,
app_id: str,
method_name: str,
data: Union[bytes, str, GrpcMessage],
content_type: Optional[str] = None,
metadata: Optional[MetadataTuple] = None,
http_verb: Optional[str] = None,
http_querystring: Optional[MetadataTuple] = None) -> InvokeMethodResponse:
"""Invoke a service method over gRPC or HTTP.
Args:
app_id (str): Application Id.
method_name (str): Method to be invoked.
data (bytes or str or GrpcMessage, optional): Data for requet's body.
content_type (str, optional): Content type of the data.
metadata (MetadataTuple, optional): Additional metadata or headers.
http_verb (str, optional): HTTP verb for the request.
http_querystring (MetadataTuple, optional): Query parameters.
Returns:
InvokeMethodResponse: the response from the method invocation.
"""
if self.invocation_client:
return self.invocation_client.invoke_method(
app_id,
method_name,
data,
content_type=content_type,
metadata=metadata,
http_verb=http_verb,
http_querystring=http_querystring)
else:
return super().invoke_method(
app_id,
method_name,
data,
content_type=content_type,
metadata=metadata,
http_verb=http_verb,
http_querystring=http_querystring)