diff --git a/Cargo.toml b/Cargo.toml index b36edac5ed..755167b788 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = [ "bytes", "communication", "container", - "kafkaesque", "logging", "timely", ] diff --git a/kafkaesque/.gitignore b/kafkaesque/.gitignore deleted file mode 100644 index ff0d8477be..0000000000 --- a/kafkaesque/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -/target -/.vscode -Cargo.lock diff --git a/kafkaesque/Cargo.toml b/kafkaesque/Cargo.toml deleted file mode 100644 index 6e1dbf5d83..0000000000 --- a/kafkaesque/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "kafkaesque" -version = "0.12.0" -authors = ["Frank McSherry "] -edition = "2018" - -[dependencies] -abomonation="0.7" -timely = { path = "../timely" } - -[dependencies.rdkafka] -version = "0.23.0" diff --git a/kafkaesque/src/bin/capture_recv.rs b/kafkaesque/src/bin/capture_recv.rs deleted file mode 100644 index 862d44cbf6..0000000000 --- a/kafkaesque/src/bin/capture_recv.rs +++ /dev/null @@ -1,46 +0,0 @@ -use timely::dataflow::operators::Inspect; -use timely::dataflow::operators::capture::Replay; -use timely::dataflow::operators::Accumulate; - -use rdkafka::config::ClientConfig; - -use kafkaesque::EventConsumer; - -fn main() { - timely::execute_from_args(std::env::args(), |worker| { - - let topic = std::env::args().nth(1).unwrap(); - let source_peers = std::env::args().nth(2).unwrap().parse::().unwrap(); - let brokers = "localhost:9092"; - - // Create Kafka stuff. - let mut consumer_config = ClientConfig::new(); - consumer_config - .set("produce.offset.report", "true") - .set("auto.offset.reset", "smallest") - .set("group.id", "example") - .set("enable.auto.commit", "false") - .set("enable.partition.eof", "false") - .set("auto.offset.reset", "earliest") - .set("session.timeout.ms", "6000") - .set("bootstrap.servers", &brokers); - - // create replayers from disjoint partition of source worker identifiers. - let replayers = - (0 .. source_peers) - .filter(|i| i % worker.peers() == worker.index()) - .map(|i| { - let topic = format!("{}-{:?}", topic, i); - EventConsumer::<_,u64>::new(consumer_config.clone(), topic) - }) - .collect::>(); - - worker.dataflow::(|scope| { - replayers - .replay_into(scope) - .count() - .inspect(|x| println!("replayed: {:?}", x)) - ; - }) - }).unwrap(); // asserts error-free execution -} diff --git a/kafkaesque/src/bin/capture_send.rs b/kafkaesque/src/bin/capture_send.rs deleted file mode 100644 index ea25025076..0000000000 --- a/kafkaesque/src/bin/capture_send.rs +++ /dev/null @@ -1,31 +0,0 @@ -use timely::dataflow::operators::ToStream; -use timely::dataflow::operators::capture::Capture; - -use rdkafka::config::ClientConfig; - -use kafkaesque::EventProducer; - -fn main() { - timely::execute_from_args(std::env::args(), |worker| { - - // target topic name. - let topic = std::env::args().nth(1).unwrap(); - let count = std::env::args().nth(2).unwrap().parse::().unwrap(); - let brokers = "localhost:9092"; - - // Create Kafka stuff. - let mut producer_config = ClientConfig::new(); - producer_config - .set("produce.offset.report", "true") - .set("bootstrap.servers", brokers); - - let topic = format!("{}-{:?}", topic, worker.index()); - let producer = EventProducer::new(producer_config, topic); - - worker.dataflow::(|scope| - (0 .. count) - .to_stream(scope) - .capture_into(producer) - ); - }).unwrap(); -} diff --git a/kafkaesque/src/bin/kafka_source.rs b/kafkaesque/src/bin/kafka_source.rs deleted file mode 100644 index f71f8df7fc..0000000000 --- a/kafkaesque/src/bin/kafka_source.rs +++ /dev/null @@ -1,63 +0,0 @@ -use timely::dataflow::operators::Inspect; - -use rdkafka::config::ClientConfig; -use rdkafka::consumer::{Consumer, BaseConsumer, DefaultConsumerContext}; - -fn main() { - - let mut args = ::std::env::args(); - args.next(); - - // Extract Kafka topic. - let topic = args.next().expect("Must specify a Kafka topic"); - let brokers = "localhost:9092"; - - // Create Kafka consumer configuration. - // Feel free to change parameters here. - let mut consumer_config = ClientConfig::new(); - consumer_config - .set("produce.offset.report", "true") - .set("auto.offset.reset", "smallest") - .set("group.id", "example") - .set("enable.auto.commit", "false") - .set("enable.partition.eof", "false") - .set("auto.offset.reset", "earliest") - .set("session.timeout.ms", "6000") - .set("bootstrap.servers", &brokers); - - timely::execute_from_args(args, move |worker| { - - // A dataflow for producing spans. - worker.dataflow::(|scope| { - - // Create a Kafka consumer. - let consumer : BaseConsumer = consumer_config.create().expect("Couldn't create consumer"); - consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); - - let strings = - kafkaesque::source(scope, "KafkaStringSource", consumer, |bytes, capability, output| { - - // If the bytes are utf8, convert to string and send. - if let Ok(text) = std::str::from_utf8(bytes) { - output - .session(capability) - .give(text.to_string()); - } - - // We need some rule to advance timestamps ... - let time = *capability.time(); - capability.downgrade(&(time + 1)); - - // Indicate that we are not yet done. - false - }); - - strings.inspect(|x| println!("Observed: {:?}", x)); - - }); - - }).expect("Timely computation failed somehow"); - - println!("Hello, world!"); -} - diff --git a/kafkaesque/src/kafka_source.rs b/kafkaesque/src/kafka_source.rs deleted file mode 100644 index 0167913ddc..0000000000 --- a/kafkaesque/src/kafka_source.rs +++ /dev/null @@ -1,138 +0,0 @@ -use timely::Data; -use timely::dataflow::{Scope, Stream}; -use timely::dataflow::operators::Capability; -use timely::dataflow::operators::generic::OutputHandle; -use timely::dataflow::channels::pushers::Tee; - -use rdkafka::Message; -use rdkafka::consumer::{ConsumerContext, BaseConsumer}; - -/// Constructs a stream of data from a Kafka consumer. -/// -/// This method assembles a stream of data from a Kafka consumer and supplied -/// user logic for determining how to interpret the binary data Kafka supplies. -/// -/// The user logic is provided binary data as `&[u8]`, and mutable references to -/// a capability and an output handle, which the logic should use to produce data -/// if it is so inclined. The logic must return a bool indicating whether the stream -/// is complete (true indicates that the operator should cease data production and -/// shut down). -/// -/// # Examples -/// ```rust,no_run -/// use timely::dataflow::operators::Inspect; -/// -/// use rdkafka::Message; -/// use rdkafka::config::ClientConfig; -/// use rdkafka::consumer::{Consumer, BaseConsumer, DefaultConsumerContext}; -/// -/// fn main() { -/// -/// let mut args = ::std::env::args(); -/// args.next(); -/// -/// // Extract Kafka topic. -/// let topic = args.next().expect("Must specify a Kafka topic"); -/// let brokers = "localhost:9092"; -/// -/// // Create Kafka consumer configuration. -/// // Feel free to change parameters here. -/// let mut consumer_config = ClientConfig::new(); -/// consumer_config -/// .set("produce.offset.report", "true") -/// .set("auto.offset.reset", "smallest") -/// .set("group.id", "example") -/// .set("enable.auto.commit", "false") -/// .set("enable.partition.eof", "false") -/// .set("auto.offset.reset", "earliest") -/// .set("session.timeout.ms", "6000") -/// .set("bootstrap.servers", &brokers); -/// -/// timely::execute_from_args(args, move |worker| { -/// -/// // A dataflow for producing spans. -/// worker.dataflow::(|scope| { -/// -/// // Create a Kafka consumer. -/// let consumer : BaseConsumer = consumer_config.create().expect("Couldn't create consumer"); -/// consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); -/// -/// let strings = -/// kafkaesque::source(scope, "KafkaStringSource", consumer, |bytes, capability, output| { -/// -/// // If the bytes are utf8, convert to string and send. -/// if let Ok(text) = std::str::from_utf8(bytes) { -/// output -/// .session(capability) -/// .give(text.to_string()); -/// } -/// -/// // We need some rule to advance timestamps ... -/// let time = *capability.time(); -/// capability.downgrade(&(time + 1)); -/// -/// // Indicate that we are not yet done. -/// false -/// }); -/// -/// strings.inspect(|x| println!("Observed: {:?}", x)); -/// -/// }); -/// -/// }).expect("Timely computation failed somehow"); -/// -/// println!("Hello, world!"); -/// } -/// ``` -pub fn kafka_source( - scope: &G, - name: &str, - consumer: BaseConsumer, - logic: L -) -> Stream -where - C: ConsumerContext+'static, - G: Scope, - D: Data, - L: Fn(&[u8], - &mut Capability, - &mut OutputHandle>) -> bool+'static, -{ - use timely::dataflow::operators::generic::source; - source(scope, name, move |capability, info| { - - let activator = scope.activator_for(&info.address[..]); - let mut cap = Some(capability); - - // define a closure to call repeatedly. - move |output| { - - // Act only if we retain the capability to send data. - let mut complete = false; - if let Some(mut capability) = cap.as_mut() { - - // Indicate that we should run again. - activator.activate(); - - // Repeatedly interrogate Kafka for [u8] messages. - // Cease only when Kafka stops returning new data. - // Could cease earlier, if we had a better policy. - while let Some(result) = consumer.poll(std::time::Duration::from_millis(0)) { - // If valid data back from Kafka - if let Ok(message) = result { - // Attempt to interpret bytes as utf8 ... - if let Some(payload) = message.payload() { - complete = logic(payload, &mut capability, output) || complete; - } - } - else { - println!("Kafka error"); - } - } - } - - if complete { cap = None; } - } - - }) -} \ No newline at end of file diff --git a/kafkaesque/src/lib.rs b/kafkaesque/src/lib.rs deleted file mode 100644 index ba91061e6e..0000000000 --- a/kafkaesque/src/lib.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::sync::Arc; -use std::sync::atomic::{AtomicIsize, Ordering}; - -use abomonation::Abomonation; -use timely::dataflow::operators::capture::event::{EventCore, EventPusherCore, EventIteratorCore}; - -use rdkafka::Message; -use rdkafka::client::ClientContext; -use rdkafka::config::ClientConfig; -use rdkafka::producer::{BaseProducer, BaseRecord, ProducerContext, DeliveryResult}; -use rdkafka::consumer::{Consumer, BaseConsumer, DefaultConsumerContext}; - -use rdkafka::config::FromClientConfigAndContext; - -pub mod kafka_source; -pub use kafka_source::kafka_source as source; - -struct OutstandingCounterContext { - outstanding: Arc, -} - -impl ClientContext for OutstandingCounterContext { } - -impl ProducerContext for OutstandingCounterContext { - type DeliveryOpaque = (); - fn delivery(&self, _report: &DeliveryResult, _: Self::DeliveryOpaque) { - self.outstanding.fetch_sub(1, Ordering::SeqCst); - } -} - -impl OutstandingCounterContext { - pub fn new(counter: &Arc) -> Self { - OutstandingCounterContext { - outstanding: counter.clone() - } - } -} - -/// A wrapper for `W: Write` implementing `EventPusher`. -pub struct EventProducerCore { - topic: String, - buffer: Vec, - producer: BaseProducer, - counter: Arc, - phant: ::std::marker::PhantomData<(T,D)>, -} - -/// [EventProducerCore] specialized to vector-based containers. -pub type EventProducer = EventProducerCore>; - -impl EventProducerCore { - /// Allocates a new `EventWriter` wrapping a supplied writer. - pub fn new(config: ClientConfig, topic: String) -> Self { - let counter = Arc::new(AtomicIsize::new(0)); - let context = OutstandingCounterContext::new(&counter); - let producer = BaseProducer::::from_config_and_context(&config, context).expect("Couldn't create producer"); - println!("allocating producer for topic {:?}", topic); - Self { - topic: topic, - buffer: vec![], - producer: producer, - counter: counter, - phant: ::std::marker::PhantomData, - } - } -} - -impl EventPusherCore for EventProducerCore { - fn push(&mut self, event: EventCore) { - unsafe { ::abomonation::encode(&event, &mut self.buffer).expect("Encode failure"); } - // println!("sending {:?} bytes", self.buffer.len()); - self.producer.send::<(),[u8]>(BaseRecord::to(self.topic.as_str()).payload(&self.buffer[..])).unwrap(); - self.counter.fetch_add(1, Ordering::SeqCst); - self.producer.poll(std::time::Duration::from_millis(0)); - self.buffer.clear(); - } -} - -impl Drop for EventProducerCore { - fn drop(&mut self) { - while self.counter.load(Ordering::SeqCst) > 0 { - self.producer.poll(std::time::Duration::from_millis(10)); - } - } -} - -/// A Wrapper for `R: Read` implementing `EventIterator`. -pub struct EventConsumerCore { - consumer: BaseConsumer, - buffer: Vec, - phant: ::std::marker::PhantomData<(T,D)>, -} - -/// [EventConsumerCore] specialized to vector-based containers. -pub type EventConsumer = EventConsumerCore>; - -impl EventConsumerCore { - /// Allocates a new `EventReader` wrapping a supplied reader. - pub fn new(config: ClientConfig, topic: String) -> Self { - println!("allocating consumer for topic {:?}", topic); - let consumer : BaseConsumer = config.create().expect("Couldn't create consumer"); - consumer.subscribe(&[&topic]).expect("Failed to subscribe to topic"); - Self { - consumer: consumer, - buffer: Vec::new(), - phant: ::std::marker::PhantomData, - } - } -} - -impl EventIteratorCore for EventConsumerCore { - fn next(&mut self) -> Option<&EventCore> { - if let Some(result) = self.consumer.poll(std::time::Duration::from_millis(0)) { - match result { - Ok(message) => { - self.buffer.clear(); - self.buffer.extend_from_slice(message.payload().unwrap()); - Some(unsafe { ::abomonation::decode::>(&mut self.buffer[..]).unwrap().0 }) - }, - Err(err) => { - println!("KafkaConsumer error: {:?}", err); - None - }, - } - } - else { None } - } -}