diff --git a/deploy/all-in-one.yaml b/deploy/all-in-one.yaml
index af36cdc1b..fa173fb84 100644
--- a/deploy/all-in-one.yaml
+++ b/deploy/all-in-one.yaml
@@ -42,6 +42,7 @@ apiVersion: v1
 data:
   gateway.yaml: |-
     port: 8080
+    sink_port: 8082
     controllers:
       - vanus-controller-0.vanus-controller:2048
       - vanus-controller-1.vanus-controller:2048
@@ -176,6 +177,10 @@ spec:
       - env:
         - name: VANUS_LOG_LEVEL
           value: INFO
+        - name: POD_IP
+          valueFrom:
+            fieldRef:
+              fieldPath: status.podIP
         image: public.ecr.aws/vanus/gateway:v0.5.1
         imagePullPolicy: IfNotPresent
         name: gateway
@@ -184,6 +189,8 @@ spec:
           name: proxy
         - containerPort: 8081
           name: cloudevents
+        - containerPort: 8082
+          name: sinkproxy
         volumeMounts:
         - mountPath: /vanus/config
           name: config-gateway
diff --git a/deploy/yaml/gateway.yaml b/deploy/yaml/gateway.yaml
index 60c1a99c2..abdd76bd8 100644
--- a/deploy/yaml/gateway.yaml
+++ b/deploy/yaml/gateway.yaml
@@ -25,6 +25,7 @@ metadata:
 data:
   gateway.yaml: |-
     port: 8080
+    sink_port: 8082
     controllers:
       - vanus-controller-0.vanus-controller:2048
       - vanus-controller-1.vanus-controller:2048
@@ -58,9 +59,15 @@ spec:
               containerPort: 8080
             - name: cloudevents
               containerPort: 8081
+            - name: sinkproxy
+              containerPort: 8082
           env:
             - name: VANUS_LOG_LEVEL
               value: INFO
+            - name: POD_IP
+              valueFrom:
+                fieldRef:
+                  fieldPath: status.podIP
           volumeMounts:
             - name: config-gateway
               mountPath: /vanus/config
diff --git a/go.mod b/go.mod
index cf8c85a46..4cc889100 100644
--- a/go.mod
+++ b/go.mod
@@ -91,8 +91,10 @@ require (
 	github.com/jonboulle/clockwork v0.2.2 // indirect
 	github.com/jtolds/gls v4.20.0+incompatible // indirect
 	github.com/klauspost/compress v1.13.6 // indirect
+	github.com/kr/pretty v0.3.0 // indirect
+	github.com/linkall-labs/sdk/proto v0.0.0-20230106022440-7302e243c0b6 // indirect
 	github.com/mattn/go-colorable v0.1.11 // indirect
-	github.com/mattn/go-isatty v0.0.14 // indirect
+	github.com/mattn/go-isatty v0.0.16 // indirect
 	github.com/mattn/go-runewidth v0.0.13 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
@@ -103,6 +105,7 @@ require (
 	github.com/prometheus/common v0.37.0 // indirect
 	github.com/prometheus/procfs v0.8.0 // indirect
 	github.com/rivo/uniseg v0.2.0 // indirect
+	github.com/rogpeppe/go-internal v1.8.0 // indirect
 	github.com/scylladb/go-set v1.0.2 // indirect
 	github.com/sirupsen/logrus v1.9.0 // indirect
 	github.com/smartystreets/assertions v1.2.0 // indirect
@@ -134,12 +137,13 @@ require (
 	go.uber.org/multierr v1.7.0 // indirect
 	go.uber.org/zap v1.17.0 // indirect
 	golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
-	golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
+	golang.org/x/net v0.4.0 // indirect
 	golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
 	golang.org/x/sync v0.1.0 // indirect
-	golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
-	golang.org/x/text v0.4.0 // indirect
+	golang.org/x/sys v0.3.0 // indirect
+	golang.org/x/text v0.5.0 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
+	gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
 	sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
diff --git a/go.sum b/go.sum
index e4be51190..e8029878c 100644
--- a/go.sum
+++ b/go.sum
@@ -284,19 +284,26 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
 github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
-github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
+github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
+github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/linkall-labs/embed-etcd v0.1.2 h1:1mTdXLwVvn9gi3XWh/PGhaEAfG8Zmxvjqwnfontb+fA=
 github.com/linkall-labs/embed-etcd v0.1.2/go.mod h1:QnecHaKt3WQBO9YGBckCDUTBd44VBR2VO8220BtWZ5U=
+github.com/linkall-labs/sdk v0.0.0-20230104145200-8f0caa58bf30 h1:Ok2X5KwupdYiWStN3s/fMWm8FefVicPEq48zGNAdNG8=
+github.com/linkall-labs/sdk v0.0.0-20230104145200-8f0caa58bf30/go.mod h1:rQKEHYp/hWhdB20Ql2unwbwXYXdwnX77xnczKiWCPBk=
+github.com/linkall-labs/sdk/proto v0.0.0-20230106022440-7302e243c0b6 h1:aJkIEUfZvoLzoqU/Ea1m+zMozqLh8fw1FneMmac1G54=
+github.com/linkall-labs/sdk/proto v0.0.0-20230106022440-7302e243c0b6/go.mod h1:NeFfkM8UVIDLDCiJo5+uO+ZuoEi07ffWUgObCBJAslQ=
 github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
 github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs=
 github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
 github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
-github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
 github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
+github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
+github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
 github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
 github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
 github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
@@ -314,7 +321,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/ncw/directio v1.0.5 h1:JSUBhdjEvVaJvOoyPAbcW0fnd0tvRXD76wEfZ1KcQz4=
 github.com/ncw/directio v1.0.5/go.mod h1:rX/pKEYkOXBGOggmcyJeJGloCkleSvphPx2eV3t6ROk=
-github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
 github.com/ohler55/ojg v1.14.5 h1:xCX2oyh/ZaoesbLH6fwVHStSJpk4o4eJs8ttXutzdg0=
@@ -322,6 +328,7 @@ github.com/ohler55/ojg v1.14.5/go.mod h1:7Ghirupn8NC8hSSDpI0gcjorPxj+vSVIONDWfli
 github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
 github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
 github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M=
 github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -363,6 +370,9 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
+github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
+github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE=
 github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
@@ -563,8 +573,8 @@ golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20221014081412-f15817d10f9b h1:tvrvnPFcdzp294diPnrdZZZ8XUt2Tyj7svb7X52iDuU=
-golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
+golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
+golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -635,8 +645,9 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
-golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
+golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -647,8 +658,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
-golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
+golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -807,8 +818,9 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
diff --git a/internal/gateway/config.go b/internal/gateway/config.go
index d7726bb33..14a72f3bd 100644
--- a/internal/gateway/config.go
+++ b/internal/gateway/config.go
@@ -23,6 +23,7 @@ import (
 
 type Config struct {
 	Port                 int                  `yaml:"port"`
+	SinkPort             int                  `yaml:"sink_port"`
 	Observability        observability.Config `yaml:"observability"`
 	ControllerAddr       []string             `yaml:"controllers"`
 	GRPCReflectionEnable bool                 `yaml:"grpc_reflection_enable"`
@@ -31,6 +32,7 @@ type Config struct {
 func (c Config) GetProxyConfig() proxy.Config {
 	return proxy.Config{
 		Endpoints:              c.ControllerAddr,
+		SinkPort:               c.SinkPort,
 		ProxyPort:              c.Port,
 		CloudEventReceiverPort: c.GetCloudEventReceiverPort(),
 		GRPCReflectionEnable:   c.GRPCReflectionEnable,
diff --git a/internal/gateway/proxy/proxy.go b/internal/gateway/proxy/proxy.go
index 9f1dffe00..91906a186 100644
--- a/internal/gateway/proxy/proxy.go
+++ b/internal/gateway/proxy/proxy.go
@@ -21,13 +21,20 @@ import (
 	"fmt"
 	"net"
 	"net/http"
+	"os"
 	"runtime/debug"
 	"strings"
 	"sync"
+	"sync/atomic"
+	stdtime "time"
 
 	v2 "github.com/cloudevents/sdk-go/v2"
+	"github.com/cloudevents/sdk-go/v2/client"
+	"github.com/cloudevents/sdk-go/v2/protocol"
+	cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
 	"github.com/cloudevents/sdk-go/v2/types"
 	recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
+	vanuspb "github.com/linkall-labs/sdk/proto/pkg/vanus"
 	eb "github.com/linkall-labs/vanus/client"
 	"github.com/linkall-labs/vanus/client/pkg/api"
 	"github.com/linkall-labs/vanus/client/pkg/option"
@@ -53,20 +60,32 @@ import (
 	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/reflection"
 	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/types/known/anypb"
 	"google.golang.org/protobuf/types/known/emptypb"
+	"google.golang.org/protobuf/types/known/timestamppb"
 	"google.golang.org/protobuf/types/known/wrapperspb"
 )
 
 const (
 	maximumNumberPerGetRequest = 64
+	eventChanCache             = 10
+	ContentTypeProtobuf        = "application/protobuf"
+	httpRequestPrefix          = "/gatewaysink"
+	datacontenttype            = "datacontenttype"
+	dataschema                 = "dataschema"
+	subject                    = "subject"
+	time                       = "time"
 )
 
 var (
-	errInvalidEventbus = errors.New("the eventbus name can't be empty")
+	errInvalidEventbus     = errors.New("the eventbus name can't be empty")
+	requestDataFromContext = cehttp.RequestDataFromContext
+	zeroTime               = stdtime.Time{}
 )
 
 type Config struct {
 	Endpoints              []string
+	SinkPort               int
 	ProxyPort              int
 	CloudEventReceiverPort int
 	Credentials            credentials.TransportCredentials
@@ -77,6 +96,39 @@ var (
 	_ cloudevents.CloudEventsServer = &ControllerProxy{}
 )
 
+type ackCallback func(bool)
+
+type message struct {
+	sequenceID uint64
+	event      *v2.Event
+}
+
+type subscribeCache struct {
+	sequenceID      uint64
+	subscriptionID  string
+	subscribeStream vanuspb.Client_SubscribeServer
+	acks            sync.Map
+	eventc          chan message
+}
+
+func newSubscribeCache(subscriptionID string, stream vanuspb.Client_SubscribeServer) *subscribeCache {
+	return &subscribeCache{
+		sequenceID:      0,
+		subscriptionID:  subscriptionID,
+		subscribeStream: stream,
+		acks:            sync.Map{},
+		eventc:          make(chan message, eventChanCache),
+	}
+}
+
+func (s *subscribeCache) ch() chan message {
+	return s.eventc
+}
+
+func (s *subscribeCache) stream() vanuspb.Client_SubscribeServer {
+	return s.subscribeStream
+}
+
 type ControllerProxy struct {
 	cfg          Config
 	tracer       *tracing.Tracer
@@ -87,6 +139,260 @@ type ControllerProxy struct {
 	grpcSrv      *grpc.Server
 	ctrl         cluster.Cluster
 	writerMap    sync.Map
+	cache        sync.Map
+}
+
+func (cp *ControllerProxy) Publish(ctx context.Context, req *vanuspb.PublishRequest) (*emptypb.Empty, error) {
+	_ctx, span := cp.tracer.Start(ctx, "Publish")
+	defer span.End()
+	if req.EventbusName == "" {
+		return nil, v2.NewHTTPResult(http.StatusBadRequest, "invalid eventbus name")
+	}
+
+	for idx := range req.Events.Events {
+		e := req.Events.Events[idx]
+		err := checkExtension(e.Attributes)
+		if err != nil {
+			return nil, v2.NewHTTPResult(http.StatusBadRequest, err.Error())
+		}
+		e.Attributes[primitive.XVanusEventbus] = &cloudevents.CloudEvent_CloudEventAttributeValue{
+			Attr: &cloudevents.CloudEvent_CloudEventAttributeValue_CeString{CeString: req.EventbusName},
+		}
+		if eventTime, ok := e.Attributes[primitive.XVanusDeliveryTime]; ok {
+			// validate event time
+			if _, err := types.ParseTime(eventTime.String()); err != nil {
+				log.Error(_ctx, "invalid format of event time", map[string]interface{}{
+					log.KeyError: err,
+					"eventTime":  eventTime.String(),
+				})
+				return nil, v2.NewHTTPResult(http.StatusBadRequest, "invalid delivery time")
+			}
+			// TODO process delay message
+			// ebName = primitive.TimerEventbusName
+		}
+	}
+
+	err := cp.client.Eventbus(ctx, req.GetEventbusName()).Writer().AppendBatch(_ctx, req.GetEvents())
+	if err != nil {
+		log.Warning(_ctx, "append to failed", map[string]interface{}{
+			log.KeyError: err,
+			"eventbus":   req.EventbusName,
+		})
+		return nil, v2.NewHTTPResult(http.StatusInternalServerError, err.Error())
+	}
+	return &emptypb.Empty{}, nil
+}
+
+func (cp *ControllerProxy) Subscribe(req *vanuspb.SubscribeRequest, stream vanuspb.Client_SubscribeServer) error {
+	_ctx, span := cp.tracer.Start(context.Background(), "Subscribe")
+	defer span.End()
+
+	// 1. modify subscription sink
+	subscriptionID, err := vanus.NewIDFromString(req.SubscriptionId)
+	if err != nil {
+		log.Error(_ctx, "parse subscription id failed", map[string]interface{}{
+			log.KeyError: err,
+			"id":         req.SubscriptionId,
+		})
+		return err
+	}
+
+	getSubscriptionReq := &ctrlpb.GetSubscriptionRequest{
+		Id: subscriptionID.Uint64(),
+	}
+	meta, err := cp.triggerCtrl.GetSubscription(context.Background(), getSubscriptionReq)
+	if err != nil {
+		log.Error(_ctx, "get subscription failed", map[string]interface{}{
+			log.KeyError: err,
+			"id":         req.SubscriptionId,
+		})
+		return err
+	}
+
+	newSink := fmt.Sprintf("http://%s:%d%s/%s",
+		os.Getenv("POD_IP"), cp.cfg.SinkPort, httpRequestPrefix, req.SubscriptionId)
+	if meta.Sink != newSink {
+		updateSubscriptionReq := &ctrlpb.UpdateSubscriptionRequest{
+			Id: subscriptionID.Uint64(),
+			Subscription: &ctrlpb.SubscriptionRequest{
+				Source:      meta.Source,
+				Types:       meta.Types,
+				Config:      meta.Config,
+				Filters:     meta.Filters,
+				Sink:        newSink,
+				Protocol:    meta.Protocol,
+				EventBus:    meta.EventBus,
+				Transformer: meta.Transformer,
+				Name:        meta.Name,
+				Description: meta.Description,
+				Disable:     meta.Disable,
+			},
+		}
+		_, err = cp.triggerCtrl.UpdateSubscription(_ctx, updateSubscriptionReq)
+		if err != nil {
+			log.Error(_ctx, "update subscription sink failed", map[string]interface{}{
+				log.KeyError: err,
+				"id":         req.SubscriptionId,
+			})
+			return err
+		}
+	}
+
+	// 2. cache subscribe info
+	subscribe := newSubscribeCache(req.SubscriptionId, stream)
+	cp.cache.Store(req.SubscriptionId, subscribe)
+
+	// 3. receive and forward events
+	for {
+		select {
+		case <-_ctx.Done():
+			return errors.ErrInternal.WithMessage("subscribe stream context done")
+		case msg := <-subscribe.ch():
+			eventpb, err := ToProto(msg.event)
+			if err != nil {
+				// TODO(jiangkai): err check
+				log.Error(_ctx, "to eventpb failed", map[string]interface{}{
+					log.KeyError: err,
+					"event":      msg.event,
+				})
+				break
+			}
+			log.Debug(_ctx, "subscribe stream send event", map[string]interface{}{
+				log.KeyError: err,
+				"eventpb":    eventpb.String(),
+			})
+			err = subscribe.stream().Send(&vanuspb.SubscribeResponse{
+				SequenceId: msg.sequenceID,
+				Events: &cloudevents.CloudEventBatch{
+					Events: []*cloudevents.CloudEvent{eventpb},
+				},
+			})
+			if err != nil {
+				cache, _ := cp.cache.LoadAndDelete(subscribe.subscriptionID)
+				if cache != nil {
+					cache.(*subscribeCache).acks.Range(func(key, value interface{}) bool {
+						value.(ackCallback)(false)
+						return true
+					})
+				}
+				return err
+			}
+		}
+	}
+}
+
+func (cp *ControllerProxy) Ack(stream vanuspb.Client_AckServer) error {
+	_ctx, span := cp.tracer.Start(context.Background(), "Ack")
+	defer span.End()
+	for {
+		rsp, err := stream.Recv()
+		if err != nil {
+			log.Error(_ctx, "ack stream recv failed", map[string]interface{}{
+				log.KeyError: err,
+			})
+			return err
+		}
+		log.Debug(_ctx, "ack stream recv a response", map[string]interface{}{
+			log.KeyError: err,
+			"rsp":        rsp,
+		})
+		cache, ok := cp.cache.Load(rsp.SubscriptionId)
+		if !ok {
+			// TODO(jiangkai): err check
+			log.Error(_ctx, "subscription not found", map[string]interface{}{
+				log.KeyError:      err,
+				"subscription-id": rsp.SubscriptionId,
+			})
+			continue
+		}
+		cb, _ := cache.(*subscribeCache).acks.LoadAndDelete(rsp.SequenceId)
+		if cb != nil {
+			cb.(ackCallback)(rsp.Success)
+		}
+	}
+}
+
+func ToProto(e *v2.Event) (*cloudevents.CloudEvent, error) {
+	container := &cloudevents.CloudEvent{
+		Id:          e.ID(),
+		Source:      e.Source(),
+		SpecVersion: e.SpecVersion(),
+		Type:        e.Type(),
+		Attributes:  make(map[string]*cloudevents.CloudEvent_CloudEventAttributeValue),
+	}
+	if e.DataContentType() != "" {
+		container.Attributes[datacontenttype], _ = attributeFor(e.DataContentType())
+	}
+	if e.DataSchema() != "" {
+		container.Attributes[dataschema], _ = attributeFor(e.DataSchema())
+	}
+	if e.Subject() != "" {
+		container.Attributes[subject], _ = attributeFor(e.Subject())
+	}
+	if e.Time() != zeroTime {
+		container.Attributes[time], _ = attributeFor(e.Time())
+	}
+	for name, value := range e.Extensions() {
+		attr, err := attributeFor(value)
+		if err != nil {
+			return nil, fmt.Errorf("failed to encode attribute %s: %w", name, err)
+		}
+		container.Attributes[name] = attr
+	}
+	container.Data = &cloudevents.CloudEvent_BinaryData{
+		BinaryData: e.Data(),
+	}
+	if e.DataContentType() == ContentTypeProtobuf {
+		anymsg := &anypb.Any{
+			TypeUrl: e.DataSchema(),
+			Value:   e.Data(),
+		}
+		container.Data = &cloudevents.CloudEvent_ProtoData{
+			ProtoData: anymsg,
+		}
+	}
+	return container, nil
+}
+
+func attributeFor(v interface{}) (*cloudevents.CloudEvent_CloudEventAttributeValue, error) {
+	vv, err := types.Validate(v)
+	if err != nil {
+		return nil, err
+	}
+	attr := &cloudevents.CloudEvent_CloudEventAttributeValue{}
+	switch vt := vv.(type) {
+	case bool:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeBoolean{
+			CeBoolean: vt,
+		}
+	case int32:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeInteger{
+			CeInteger: vt,
+		}
+	case string:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeString{
+			CeString: vt,
+		}
+	case []byte:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeBytes{
+			CeBytes: vt,
+		}
+	case types.URI:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeUri{
+			CeUri: vt.String(),
+		}
+	case types.URIRef:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeUriRef{
+			CeUriRef: vt.String(),
+		}
+	case types.Timestamp:
+		attr.Attr = &cloudevents.CloudEvent_CloudEventAttributeValue_CeTimestamp{
+			CeTimestamp: timestamppb.New(vt.Time),
+		}
+	default:
+		return nil, fmt.Errorf("unsupported attribute type: %T", v)
+	}
+	return attr, nil
 }
 
 func (cp *ControllerProxy) Send(ctx context.Context, batch *cloudevents.BatchEvent) (*emptypb.Empty, error) {
@@ -202,8 +508,9 @@ func (cp *ControllerProxy) Start() error {
 
 	proxypb.RegisterControllerProxyServer(cp.grpcSrv, cp)
 	cloudevents.RegisterCloudEventsServer(cp.grpcSrv, cp)
+	vanuspb.RegisterClientServer(cp.grpcSrv, cp)
 
-	listen, err := net.Listen("tcp", fmt.Sprintf(":%d", cp.cfg.ProxyPort))
+	proxyListen, err := net.Listen("tcp", fmt.Sprintf(":%d", cp.cfg.ProxyPort))
 	if err != nil {
 		return err
 	}
@@ -211,16 +518,81 @@ func (cp *ControllerProxy) Start() error {
 	wg := sync.WaitGroup{}
 	wg.Add(1)
 	go func() {
-		err = cp.grpcSrv.Serve(listen)
+		err = cp.grpcSrv.Serve(proxyListen)
 		if err != nil {
 			panic(fmt.Sprintf("start grpc proxy failed: %s", err.Error()))
 		}
 		wg.Done()
 	}()
 	log.Info(context.Background(), "the grpc proxy ready to work", nil)
+
+	sinkListen, err := net.Listen("tcp", fmt.Sprintf(":%d", cp.cfg.SinkPort))
+	if err != nil {
+		return err
+	}
+
+	c, err := client.NewHTTP(cehttp.WithListener(sinkListen), cehttp.WithRequestDataAtContextMiddleware())
+	if err != nil {
+		return err
+	}
+
+	wg.Add(1)
+	go func() {
+		if err := c.StartReceiver(context.Background(), cp.receive); err != nil {
+			panic(fmt.Sprintf("start CloudEvents receiver failed: %s", err.Error()))
+		}
+		wg.Done()
+	}()
+	log.Info(context.Background(), "the sink proxy ready to work", nil)
 	return nil
 }
 
+func (cp *ControllerProxy) receive(ctx context.Context, event v2.Event) (*v2.Event, protocol.Result) {
+	_ctx, span := cp.tracer.Start(ctx, "receive")
+	defer span.End()
+	subscriptionID := getSubscriptionIDFromPath(requestDataFromContext(_ctx))
+	if subscriptionID == "" {
+		return nil, v2.NewHTTPResult(http.StatusBadRequest, "invalid subscription id")
+	}
+	cache, ok := cp.cache.Load(subscriptionID)
+	if !ok {
+		// retry
+		return nil, v2.NewHTTPResult(http.StatusInternalServerError, "subscription not exist")
+	}
+	log.Debug(_ctx, "sink proxy received a event", map[string]interface{}{
+		"event": event.String(),
+	})
+	sequenceID := atomic.AddUint64(&cache.(*subscribeCache).sequenceID, 1)
+	var success bool
+	donec := make(chan struct{})
+	cache.(*subscribeCache).acks.Store(sequenceID, ackCallback(func(result bool) {
+		log.Info(_ctx, "ack callback", map[string]interface{}{
+			"result": result,
+		})
+		success = result
+		close(donec)
+	}))
+	cache.(*subscribeCache).eventc <- message{
+		sequenceID: sequenceID,
+		event:      &event,
+	}
+	<-donec
+
+	if !success {
+		return nil, v2.NewHTTPResult(http.StatusInternalServerError, "event processing failed")
+	}
+	return nil, v2.ResultACK
+}
+
+func getSubscriptionIDFromPath(reqData *cehttp.RequestData) string {
+	// TODO validate
+	reqPathStr := reqData.URL.String()
+	if !strings.HasPrefix(reqPathStr, httpRequestPrefix) {
+		return ""
+	}
+	return strings.TrimLeft(reqPathStr[len(httpRequestPrefix):], "/")
+}
+
 func (cp *ControllerProxy) Stop() {
 	if cp.grpcSrv != nil {
 		cp.grpcSrv.GracefulStop()