Skip to content

Commit

Permalink
Merge pull request #1120 from neha0305verma/completed-on
Browse files Browse the repository at this point in the history
DIK-3360 : completedOn fix
  • Loading branch information
amitpriyadarshi authored Jul 30, 2020
2 parents 463a98a + 0f54f35 commit caf295f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 25 deletions.
2 changes: 1 addition & 1 deletion platform-jobs/samza/course-batch-updater/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<scala.version>2.11</scala.version>
<hadoop.version>2.6.2</hadoop.version>
</properties>
<version>0.0.45</version>
<version>0.0.48</version>
<dependencies>
<dependency>
<groupId>org.ekstep</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.Arrays;
import java.util.Iterator;
import java.util.stream.Collectors;

public class CourseBatchUpdater extends BaseCourseBatchUpdater {
Expand Down Expand Up @@ -206,12 +207,22 @@ public void updateBatchProgress(Session cassandraSession, CourseProgressHandler
dataToUpdate.putAll((Map<String, Object>) event.getValue());
dataToSelect.put("courseid", dataToUpdate.remove("courseId"));
if(((Number) dataToUpdate.get("status")).intValue() == 2) {
//adding userCourseBatch for auto certificate generation
userCertificateEvents.add((Map<String, Object>) dataToUpdate.get("userCourseBatch"));
//read status and completedOn from cassandra
Map<String, Object> result = readQuery(cassandraSession, dataToSelect);
LOGGER.info("CourseBatchUpdater:updateBatchProgress: result:: " + result);
if (MapUtils.isEmpty(result) || (((Number) result.get("status")).intValue() != 2 || (((Number) result.get("status")).intValue() == 2 && result.get("completedOn") == null))) {
//adding userCourseBatch for auto certificate generation
userCertificateEvents.add((Map<String, Object>) dataToUpdate.get("userCourseBatch"));
LOGGER.info("CourseBatchUpdater:updateBatchProgress: after auto cert:: ");
dataToUpdate.remove("userCourseBatch");
//Update cassandra
updateQueryList.add(updateQuery(keyspace, table, dataToUpdate, dataToSelect));
}
} else {
dataToUpdate.remove("userCourseBatch");
//Update cassandra
updateQueryList.add(updateQuery(keyspace, table, dataToUpdate, dataToSelect));
}
//Update cassandra
updateQueryList.add(updateQuery(keyspace, table, dataToUpdate, dataToSelect));
} catch (Exception e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -302,4 +313,23 @@ private Map<String, Object> generateInstructionEvent(Map<String, Object> certifi
String beJobRequestEvent = LogTelemetryEventUtil.logInstructionEvent(actor, context, object, edata);
return mapper.readValue(beJobRequestEvent, new TypeReference<Map<String, Object>>() {});
}

private Map<String, Object> readQuery(Session cassandraSession, Map<String, Object> dataToSelect) {
String query = "SELECT status, completedOn FROM " + keyspace +"." + table +
" where courseid='" + dataToSelect.get("courseid") + "' AND batchid='" + dataToSelect.get("batchid") + "' AND userid='" + dataToSelect.get("userid") + "';";
LOGGER.info("CourseBatchUpdater:readQuery: started + query " + query);
ResultSet resultSet = SunbirdCassandraUtil.execute(cassandraSession, query);
Iterator<Row> rows = resultSet.iterator();
LOGGER.info("CourseBatchUpdater:readQuery: rows.hasNext():: " + rows.hasNext() + " rows : " + rows);
Map<String, Object> result = new HashMap<>();
while(rows.hasNext()) {
Row row = rows.next();
LOGGER.info("CourseBatchUpdater:readQuery: row " + row);
result.put("status", row.getInt("status"));
LOGGER.info("CourseBatchUpdater:readQuery: row.getTimestamp(\"completedOn\"):: " + row.getTimestamp("completedOn"));
result.put("completedOn", row.getTimestamp("completedOn"));
}
LOGGER.info("CourseBatchUpdater:readQuery: completed : result :- " + result);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.sunbird.jobs.service.util;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.apache.samza.config.Config;
import org.apache.samza.system.SystemStream;
Expand All @@ -21,22 +23,33 @@
import org.sunbird.jobs.samza.service.util.CourseBatchUpdater;
import org.sunbird.jobs.samza.task.CourseProgressHandler;
import org.sunbird.jobs.samza.util.RedisConnect;
import org.sunbird.jobs.samza.util.SunbirdCassandraUtil;
import redis.clients.jedis.Jedis;

import java.lang.reflect.Method;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Iterator;

import static org.mockito.Matchers.anyString;

@Ignore
@RunWith(PowerMockRunner.class)
@PrepareForTest({CourseBatchUpdater.class, BaseCourseBatchUpdater.class})
@PrepareForTest({CourseBatchUpdater.class, BaseCourseBatchUpdater.class, SunbirdCassandraUtil.class, Row.class, ResultSet.class, Iterator.class})
@PowerMockIgnore({"javax.management.*", "sun.security.ssl.*", "javax.net.ssl.*" , "javax.crypto.*"})
public class CourseBatchUpdaterTest {

@Mock
private ResultSet resultSet;
@Mock
private Iterator<Row> iterator;
@Mock
private Row row;
private CourseBatchUpdater courseBatchUpdater;

private MessageCollector collectorMock;
@Mock(name = "certificateInstructionStream")
SystemStream certificateInstructionStream = new SystemStream("kafka", Platform.config.getString("course.batch.certificate.topic"));
Expand Down Expand Up @@ -77,41 +90,27 @@ public void testUpdatebatchStatus() throws Exception {

@Test
public void testPushCertificateEvents() {
CourseProgressHandler courseProgressHandler = new CourseProgressHandler();
Config config = Mockito.mock(Config.class);
Mockito.when(config.get("redis.host")).thenReturn("localhost");
Mockito.when(config.getInt("redis.port")).thenReturn(6379);
Jedis redisConnect = new RedisConnect(config).getConnection();
Session session = Mockito.mock(Session.class);
SystemStream certificateInstructionStream = new SystemStream("kafka", "coursebatch.certificate.request");
CourseBatchUpdater updater = PowerMockito.spy(new CourseBatchUpdater(redisConnect, session, certificateInstructionStream));

mockAllSession();
List<Map<String, Object>> userCertificateEvents = new ArrayList<Map<String, Object>>(){{
add(new HashMap<String, Object>(){{
put("batchId", "0128057392291102720");
put("userId", "874ed8a5-782e-4f6c-8f36-e0288455901e");
put("courseId", "do_1127212344324751361295");
}});
}};
updater.pushCertificateEvents(userCertificateEvents, collectorMock);
courseBatchUpdater.pushCertificateEvents(userCertificateEvents, collectorMock);
}

@Test
public void testgenerateInstructionEvent() throws Exception{
Config config = Mockito.mock(Config.class);
Mockito.when(config.get("redis.host")).thenReturn("localhost");
Mockito.when(config.getInt("redis.port")).thenReturn(6379);
Jedis redisConnect = new RedisConnect(config).getConnection();
Session session = Mockito.mock(Session.class);
SystemStream certificateInstructionStream = new SystemStream("kafka", "coursebatch.certificate.request");
CourseBatchUpdater updater = PowerMockito.spy(new CourseBatchUpdater(redisConnect, session, certificateInstructionStream));
mockAllSession();
Method generateInstructionEventMethod = CourseBatchUpdater.class.getDeclaredMethod("generateInstructionEvent", Map.class);
generateInstructionEventMethod.setAccessible(true);
Map<String, Object> certificateEvent = new HashMap<>();
certificateEvent.put("batchId", "0128057392291102720");
certificateEvent.put("userId", "874ed8a5-782e-4f6c-8f36-e0288455901e");
certificateEvent.put("courseId", "do_1127212344324751361295");
Map<String, Object> updatedCertificateEvent = (Map<String, Object>) generateInstructionEventMethod.invoke(updater, certificateEvent);
Map<String, Object> updatedCertificateEvent = (Map<String, Object>) generateInstructionEventMethod.invoke(courseBatchUpdater, certificateEvent);
Assert.assertTrue(updatedCertificateEvent.containsKey("actor"));
Assert.assertTrue(updatedCertificateEvent.containsKey("edata"));
Map<String, Object> event = (Map<String, Object>) updatedCertificateEvent.get("edata");
Expand All @@ -121,4 +120,42 @@ public void testgenerateInstructionEvent() throws Exception{
Assert.assertTrue(updatedCertificateEvent.containsKey("eid") && updatedCertificateEvent.containsValue("BE_JOB_REQUEST"));
Assert.assertTrue(updatedCertificateEvent.containsKey("mid"));
}

@Test
public void testReadQuery() throws Exception{
mockAllSession();
Method generateInstructionEventMethod = CourseBatchUpdater.class.getDeclaredMethod("readQuery", Session.class, Map.class);
generateInstructionEventMethod.setAccessible(true);
Map<String, Object> dataToSelect = new HashMap<String, Object>(){{
put("batchid", "0128057392291102720");
put("userid", "874ed8a5-782e-4f6c-8f36-e0288455901e");
put("courseid", "do_1127212344324751361295");
}};
mockForReadQuery();
PowerMockito.when(row.getInt("status")).thenReturn(1);
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
PowerMockito.when(row.getTimestamp("completedOn")).thenReturn(timestamp);
Session session = Mockito.mock(Session.class);
Map<String, Object> result = (Map<String, Object>) generateInstructionEventMethod.invoke(courseBatchUpdater, session, dataToSelect);
Assert.assertTrue(((Number) result.get("status")).intValue() != 0);
Assert.assertTrue((result.get("completedOn") != null));
}

public void mockForReadQuery(){
PowerMockito.stub(PowerMockito.method(SunbirdCassandraUtil.class, "execute")).toReturn(resultSet);
PowerMockito.when(resultSet.iterator()).thenReturn(iterator);
// check in method it is calling two time or not
PowerMockito.when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
PowerMockito.when(iterator.next()).thenReturn(row);
}

public void mockAllSession(){
Config config = Mockito.mock(Config.class);
Mockito.when(config.get("redis.host")).thenReturn("localhost");
Mockito.when(config.getInt("redis.port")).thenReturn(6379);
Jedis redisConnect = new RedisConnect(config).getConnection();
Session session = Mockito.mock(Session.class);
SystemStream certificateInstructionStream = new SystemStream("kafka", "coursebatch.certificate.request");
courseBatchUpdater = PowerMockito.spy(new CourseBatchUpdater(redisConnect, session, certificateInstructionStream));
}
}
2 changes: 1 addition & 1 deletion platform-jobs/samza/distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dependency>
<groupId>org.ekstep</groupId>
<artifactId>course-batch-updater</artifactId>
<version>0.0.45</version>
<version>0.0.48</version>
<type>tar.gz</type>
<classifier>distribution</classifier>
</dependency>
Expand Down

0 comments on commit caf295f

Please sign in to comment.