977 lines
37 KiB
C++
977 lines
37 KiB
C++
/**
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include <cppunit/extensions/HelperMacros.h>
|
|
#include "CppAssertHelper.h"
|
|
|
|
#include "ZKMocks.h"
|
|
#include <proto.h>
|
|
|
|
using namespace std;
|
|
|
|
class Zookeeper_operations : public CPPUNIT_NS::TestFixture
|
|
{
|
|
CPPUNIT_TEST_SUITE(Zookeeper_operations);
|
|
#ifndef THREADED
|
|
CPPUNIT_TEST(testPing);
|
|
CPPUNIT_TEST(testUnsolicitedPing);
|
|
CPPUNIT_TEST(testTimeoutCausedByWatches1);
|
|
CPPUNIT_TEST(testTimeoutCausedByWatches2);
|
|
CPPUNIT_TEST(testCloseWhileInProgressFromMain);
|
|
CPPUNIT_TEST(testCloseWhileInProgressFromCompletion);
|
|
CPPUNIT_TEST(testCloseWhileMultiInProgressFromMain);
|
|
CPPUNIT_TEST(testCloseWhileMultiInProgressFromCompletion);
|
|
#else
|
|
CPPUNIT_TEST(testAsyncWatcher1);
|
|
CPPUNIT_TEST(testAsyncGetOperation);
|
|
#endif
|
|
CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
|
|
CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
|
|
CPPUNIT_TEST(testConcurrentOperations1);
|
|
CPPUNIT_TEST_SUITE_END();
|
|
zhandle_t *zh;
|
|
FILE *logfile;
|
|
|
|
static void watcher(zhandle_t *, int, int, const char *,void*){}
|
|
public:
|
|
Zookeeper_operations() {
|
|
logfile = openlogfile("Zookeeper_operations");
|
|
}
|
|
|
|
~Zookeeper_operations() {
|
|
if (logfile) {
|
|
fflush(logfile);
|
|
fclose(logfile);
|
|
logfile = 0;
|
|
}
|
|
}
|
|
|
|
void setUp()
|
|
{
|
|
zoo_set_log_stream(logfile);
|
|
|
|
zoo_deterministic_conn_order(0);
|
|
zh=0;
|
|
}
|
|
|
|
void tearDown()
|
|
{
|
|
zookeeper_close(zh);
|
|
}
|
|
|
|
class AsyncGetOperationCompletion: public AsyncCompletion{
|
|
public:
|
|
AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
|
|
virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
|
|
synchronized(mx_);
|
|
called_=true;
|
|
rc_=rc;
|
|
value_.erase();
|
|
if(rc!=ZOK) return;
|
|
value_.assign(value,len);
|
|
if(stat)
|
|
stat_=*stat;
|
|
}
|
|
bool operator()()const{
|
|
synchronized(mx_);
|
|
return called_;
|
|
}
|
|
mutable Mutex mx_;
|
|
bool called_;
|
|
int rc_;
|
|
string value_;
|
|
NodeStat stat_;
|
|
};
|
|
|
|
class AsyncVoidOperationCompletion: public AsyncCompletion{
|
|
public:
|
|
AsyncVoidOperationCompletion():called_(false),rc_(ZAPIERROR){}
|
|
virtual void voidCompl(int rc){
|
|
synchronized(mx_);
|
|
called_=true;
|
|
rc_=rc;
|
|
}
|
|
bool operator()()const{
|
|
synchronized(mx_);
|
|
return called_;
|
|
}
|
|
mutable Mutex mx_;
|
|
bool called_;
|
|
int rc_;
|
|
};
|
|
#ifndef THREADED
|
|
// send two get data requests; verify that the corresponding completions called
|
|
void testConcurrentOperations1()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
// first operation
|
|
AsyncGetOperationCompletion res1;
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// second operation
|
|
AsyncGetOperationCompletion res2;
|
|
zkServer.addOperationResponse(new ZooGetResponse("2",1));
|
|
rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// process the send queue
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
while((rc=zookeeper_process(zh,interest))==ZOK) {
|
|
millisleep(100);
|
|
//printf("%d\n", rc);
|
|
}
|
|
//printf("RC = %d", rc);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
|
|
CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
|
|
CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
|
|
}
|
|
// send two getData requests and disconnect while the second request is
|
|
// outstanding;
|
|
// verify the completions are called
|
|
void testOperationsAndDisconnectConcurrently1()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
// first operation
|
|
AsyncGetOperationCompletion res1;
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// second operation
|
|
AsyncGetOperationCompletion res2;
|
|
zkServer.addOperationResponse(new ZooGetResponse("2",1));
|
|
rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// process the send queue
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// simulate a disconnect
|
|
zkServer.setConnectionLost();
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
|
|
CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
|
|
CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
|
|
}
|
|
// send two getData requests and simulate timeout while the both request
|
|
// are pending;
|
|
// verify the completions are called
|
|
void testOperationsAndDisconnectConcurrently2()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
// first operation
|
|
AsyncGetOperationCompletion res1;
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// second operation
|
|
AsyncGetOperationCompletion res2;
|
|
zkServer.addOperationResponse(new ZooGetResponse("2",1));
|
|
rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// simulate timeout
|
|
timeMock.tick(+10); // advance system time by 10 secs
|
|
// the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
|
|
// make sure the completions have been called
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
|
|
}
|
|
|
|
class PingCountingServer: public ZookeeperServer{
|
|
public:
|
|
PingCountingServer():pingCount_(0){}
|
|
// called when a client request is received
|
|
virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
|
|
if(rh.type==ZOO_PING_OP){
|
|
pingCount_++;
|
|
}
|
|
}
|
|
int pingCount_;
|
|
};
|
|
|
|
// establish a connection; idle for a while
|
|
// verify ping was sent at least once
|
|
void testPing()
|
|
{
|
|
const int TIMEOUT=9; // timeout in secs
|
|
Mock_gettimeofday timeMock;
|
|
PingCountingServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
// receive timeout is in milliseconds
|
|
zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
// Round 1.
|
|
int rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// simulate waiting for the select() call to timeout;
|
|
// advance the system clock accordingly
|
|
timeMock.tick(tv);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
// verify no ping sent
|
|
CPPUNIT_ASSERT(zkServer.pingCount_==0);
|
|
|
|
// Round 2.
|
|
// the client should have the idle threshold exceeded, by now
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// assume the socket is writable, so no idling here; move on to
|
|
// zookeeper_process immediately
|
|
rc=zookeeper_process(zh,interest);
|
|
// ZNOTHING means the client hasn't received a ping response yet
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
// verify a ping is sent
|
|
CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
|
|
|
|
// Round 3.
|
|
// we're going to receive a server PING response and make sure
|
|
// that the client has updated its last_recv timestamp
|
|
zkServer.addRecvResponse(new PingResponse);
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// pseudo-sleep for a short while (10 ms)
|
|
timeMock.millitick(10);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// only one ping so far?
|
|
CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
|
|
CPPUNIT_ASSERT(timeMock==zh->last_recv);
|
|
|
|
// Round 4
|
|
// make sure that a ping is not sent if something is outstanding
|
|
AsyncGetOperationCompletion res1;
|
|
rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
timeMock.tick(tv);
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// pseudo-sleep for a short while (10 ms)
|
|
timeMock.millitick(10);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
// only one ping so far?
|
|
CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
|
|
}
|
|
|
|
// ZOOKEEPER-2253: Permit unsolicited pings
|
|
void testUnsolicitedPing()
|
|
{
|
|
const int TIMEOUT=9; // timeout in secs
|
|
Mock_gettimeofday timeMock;
|
|
PingCountingServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
// receive timeout is in milliseconds
|
|
zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
|
|
int rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// verify no ping sent
|
|
CPPUNIT_ASSERT(zkServer.pingCount_==0);
|
|
|
|
// we're going to receive a unsolicited PING response; ensure
|
|
// that the client has updated its last_recv timestamp
|
|
timeMock.tick(tv);
|
|
zkServer.addRecvResponse(new PingResponse);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
CPPUNIT_ASSERT(timeMock==zh->last_recv);
|
|
}
|
|
|
|
// simulate a watch arriving right before a ping is due
|
|
// assert the ping is sent nevertheless
|
|
void testTimeoutCausedByWatches1()
|
|
{
|
|
const int TIMEOUT=9; // timeout in secs
|
|
Mock_gettimeofday timeMock;
|
|
PingCountingServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
// receive timeout is in milliseconds
|
|
zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
// Round 1.
|
|
int rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// simulate waiting for the select() call to timeout;
|
|
// advance the system clock accordingly
|
|
timeMock.tick(tv);
|
|
timeMock.tick(-1); // set the clock to a millisecond before a ping is due
|
|
// trigger a watch now
|
|
zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// arrival of a watch sets the last_recv to the current time
|
|
CPPUNIT_ASSERT(timeMock==zh->last_recv);
|
|
// spend 1 millisecond by processing the watch
|
|
timeMock.tick(1);
|
|
|
|
// Round 2.
|
|
// a ping is due; zookeeper_interest() must send it now
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// no delay here -- as if the socket is immediately writable
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
// verify a ping is sent
|
|
CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
|
|
}
|
|
|
|
// similar to testTimeoutCausedByWatches1, but this time the watch is
|
|
// triggered while the client has an outstanding request
|
|
// assert the ping is sent on time
|
|
void testTimeoutCausedByWatches2()
|
|
{
|
|
const int TIMEOUT=9; // timeout in secs
|
|
Mock_gettimeofday now;
|
|
PingCountingServer zkServer;
|
|
// must call zookeeper_close() while all the mocks are in scope
|
|
CloseFinally guard(&zh);
|
|
|
|
// receive timeout is in milliseconds
|
|
zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// simulate connected state
|
|
forceConnected(zh);
|
|
|
|
// queue up a request; keep it pending (as if the server is busy or has died)
|
|
AsyncGetOperationCompletion res1;
|
|
zkServer.addOperationResponse(new ZooGetResponse("2",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
|
|
int fd=0;
|
|
int interest=0;
|
|
timeval tv;
|
|
// Round 1.
|
|
// send the queued up zoo_aget() request
|
|
Mock_gettimeofday beginningOfTimes(now); // remember when we started
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// no delay -- the socket is writable
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// Round 2.
|
|
// what's next?
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
// no response from the server yet -- waiting in the select() call
|
|
now.tick(tv);
|
|
// a watch has arrived, thus preventing the connection from timing out
|
|
zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
|
|
CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
|
|
|
|
//Round 3.
|
|
// now is the time to send a ping; make sure it's actually sent
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
rc=zookeeper_process(zh,interest);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
|
|
// verify a ping is sent
|
|
CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
|
|
// make sure only 1/3 of the timeout has passed
|
|
CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
|
|
}
|
|
|
|
// ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
|
|
// while there is a request waiting for being processed
|
|
// call zookeeper_close() from the main event loop
|
|
// assert the completion callback is called
|
|
void testCloseWhileInProgressFromMain()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
forceConnected(zh);
|
|
zhandle_t* savezh=zh;
|
|
|
|
// issue a request
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
AsyncGetOperationCompletion res1;
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// but do not allow Zookeeper C Client to process the request
|
|
// and call zookeeper_close() from the main event loop immediately
|
|
Mock_free_noop freeMock;
|
|
rc=zookeeper_close(zh); zh=0;
|
|
freeMock.disable();
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// verify that memory for completions was freed (would be freed if no mock installed)
|
|
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
|
|
|
|
// verify that completion was called, and it was called with ZCLOSING status
|
|
CPPUNIT_ASSERT(res1.called_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
|
|
}
|
|
|
|
// ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
|
|
// send some request #1
|
|
// then, while there is a request #2 waiting for being processed
|
|
// call zookeeper_close() from the completion callback of request #1
|
|
// assert the completion callback #2 is called
|
|
void testCloseWhileInProgressFromCompletion()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
forceConnected(zh);
|
|
zhandle_t* savezh=zh;
|
|
|
|
// will handle completion on request #1 and issue request #2 from it
|
|
class AsyncGetOperationCompletion1: public AsyncCompletion{
|
|
public:
|
|
AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
|
|
AsyncGetOperationCompletion *res2)
|
|
:zh_(zh),zkServer_(zkServer),res2_(res2){}
|
|
|
|
virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
|
|
|
|
// from the completion #1 handler, issue request #2
|
|
zkServer_->addOperationResponse(new ZooGetResponse("2",1));
|
|
int rc2=zoo_aget(*zh_,"/x/y/2",0,asyncCompletion,res2_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
|
|
|
|
// but do not allow Zookeeper C Client to process the request #2
|
|
// and call zookeeper_close() from the completion callback of request #1
|
|
rc2=zookeeper_close(*zh_); *zh_=0;
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
|
|
|
|
// do not disable freeMock here, let completion #2 handler
|
|
// return through ZooKeeper C Client internals to the main loop
|
|
// and fulfill the work
|
|
}
|
|
|
|
zhandle_t **zh_;
|
|
ZookeeperServer *zkServer_;
|
|
AsyncGetOperationCompletion *res2_;
|
|
};
|
|
|
|
// issue request #1
|
|
AsyncGetOperationCompletion res2;
|
|
AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2);
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// process the send queue
|
|
int fd; int interest; timeval tv;
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
Mock_free_noop freeMock;
|
|
while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
|
|
millisleep(100);
|
|
}
|
|
freeMock.disable();
|
|
CPPUNIT_ASSERT(zh==0);
|
|
|
|
// verify that memory for completions was freed (would be freed if no mock installed)
|
|
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
|
|
|
|
// verify that completion #2 was called, and it was called with ZCLOSING status
|
|
CPPUNIT_ASSERT(res2.called_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
|
|
}
|
|
|
|
// ZOOKEEPER-2891: Invalid processing of zookeeper_close for mutli-request
|
|
// while there is a multi request waiting for being processed
|
|
// call zookeeper_close() from the main event loop
|
|
// assert the completion callback is called with status ZCLOSING
|
|
void testCloseWhileMultiInProgressFromMain()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
forceConnected(zh);
|
|
zhandle_t* savezh=zh;
|
|
|
|
// issue a multi request
|
|
int nops=2;
|
|
zoo_op_t ops[nops];
|
|
zoo_op_result_t results[nops];
|
|
zoo_create_op_init(&ops[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
|
|
zoo_create_op_init(&ops[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
|
|
// TODO: Provide ZooMultiResponse. However, it's not required in this test.
|
|
// zkServer.addOperationResponse(new ZooMultiResponse(...));
|
|
AsyncVoidOperationCompletion res1;
|
|
int rc=zoo_amulti(zh,nops,ops,results,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// but do not allow Zookeeper C Client to process the request
|
|
// and call zookeeper_close() from the main event loop immediately
|
|
Mock_free_noop freeMock;
|
|
rc=zookeeper_close(zh); zh=0;
|
|
freeMock.disable();
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// verify that memory for completions was freed (would be freed if no mock installed)
|
|
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
|
|
|
|
// verify that completion was called, and it was called with ZCLOSING status
|
|
CPPUNIT_ASSERT(res1.called_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
|
|
}
|
|
|
|
// ZOOKEEPER-2891: Invalid processing of zookeeper_close for mutli-request
|
|
// send some request #1 (not a multi request)
|
|
// then, while there is a multi request #2 waiting for being processed
|
|
// call zookeeper_close() from the completion callback of request #1
|
|
// assert the completion callback #2 is called with status ZCLOSING
|
|
void testCloseWhileMultiInProgressFromCompletion()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
ZookeeperServer zkServer;
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
forceConnected(zh);
|
|
zhandle_t* savezh=zh;
|
|
|
|
// these shall persist during the test
|
|
int nops=2;
|
|
zoo_op_t ops[nops];
|
|
zoo_op_result_t results[nops];
|
|
|
|
// will handle completion on request #1 and issue request #2 from it
|
|
class AsyncGetOperationCompletion1: public AsyncCompletion{
|
|
public:
|
|
AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
|
|
AsyncVoidOperationCompletion *res2,
|
|
int nops, zoo_op_t* ops, zoo_op_result_t* results)
|
|
:zh_(zh),zkServer_(zkServer),res2_(res2),nops_(nops),ops_(ops),results_(results){}
|
|
|
|
virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
|
|
|
|
// from the completion #1 handler, issue multi request #2
|
|
assert(nops_>=2);
|
|
zoo_create_op_init(&ops_[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
|
|
zoo_create_op_init(&ops_[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
|
|
// TODO: Provide ZooMultiResponse. However, it's not required in this test.
|
|
// zkServer_->addOperationResponse(new ZooMultiResponse(...));
|
|
int rc2=zoo_amulti(*zh_,nops_,ops_,results_,asyncCompletion,res2_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
|
|
|
|
// but do not allow Zookeeper C Client to process the request #2
|
|
// and call zookeeper_close() from the completion callback of request #1
|
|
rc2=zookeeper_close(*zh_); *zh_=0;
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
|
|
|
|
// do not disable freeMock here, let completion #2 handler
|
|
// return through ZooKeeper C Client internals to the main loop
|
|
// and fulfill the work
|
|
}
|
|
|
|
zhandle_t **zh_;
|
|
ZookeeperServer *zkServer_;
|
|
AsyncVoidOperationCompletion *res2_;
|
|
int nops_;
|
|
zoo_op_t* ops_;
|
|
zoo_op_result_t* results_;
|
|
};
|
|
|
|
// issue some request #1 (not a multi request)
|
|
AsyncVoidOperationCompletion res2;
|
|
AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2,nops,ops,results);
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
// process the send queue
|
|
int fd; int interest; timeval tv;
|
|
rc=zookeeper_interest(zh,&fd,&interest,&tv);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
Mock_free_noop freeMock;
|
|
while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
|
|
millisleep(100);
|
|
}
|
|
freeMock.disable();
|
|
CPPUNIT_ASSERT(zh==0);
|
|
|
|
// verify that memory for completions was freed (would be freed if no mock installed)
|
|
CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
|
|
CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
|
|
|
|
// verify that completion #2 was called, and it was called with ZCLOSING status
|
|
CPPUNIT_ASSERT(res2.called_);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
|
|
}
|
|
|
|
#else
|
|
class TestGetDataJob: public TestJob{
|
|
public:
|
|
TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
|
|
:svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
|
|
virtual void run(){
|
|
int i;
|
|
for(i=0;i<reps_;i++){
|
|
char buf;
|
|
int size=sizeof(buf);
|
|
|
|
if (i % 10 == 0) {
|
|
// We need to pause every once in a while so we don't
|
|
// get too far ahead and finish before the disconnect
|
|
millisleep(1);
|
|
}
|
|
svr_->addOperationResponse(new ZooGetResponse("1",1));
|
|
rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
|
|
if(rc_!=ZOK){
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
ZookeeperServer* svr_;
|
|
zhandle_t* zh_;
|
|
int rc_;
|
|
int reps_;
|
|
};
|
|
class TestConcurrentOpJob: public TestGetDataJob{
|
|
public:
|
|
static const int REPS=500;
|
|
TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
|
|
TestGetDataJob(svr,zh,REPS){}
|
|
virtual TestJob* clone() const {
|
|
return new TestConcurrentOpJob(svr_,zh_);
|
|
}
|
|
virtual void validate(const char* file, int line) const{
|
|
CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
|
|
}
|
|
};
|
|
void testConcurrentOperations1()
|
|
{
|
|
for(int counter=0; counter<50; counter++){
|
|
// frozen time -- no timeouts and no pings
|
|
Mock_gettimeofday timeMock;
|
|
|
|
ZookeeperServer zkServer;
|
|
Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
|
|
// must call zookeeper_close() while all the mocks are in the scope!
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// make sure the client has connected
|
|
CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
|
|
|
|
TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
|
|
jmgr.startAllJobs();
|
|
jmgr.wait();
|
|
// validate test results
|
|
VALIDATE_JOBS(jmgr);
|
|
}
|
|
}
|
|
class ZKGetJob: public TestJob{
|
|
public:
|
|
static const int REPS=1000;
|
|
ZKGetJob(zhandle_t* zh)
|
|
:zh_(zh),rc_(ZAPIERROR){}
|
|
virtual TestJob* clone() const {
|
|
return new ZKGetJob(zh_);
|
|
}
|
|
virtual void run(){
|
|
int i;
|
|
for(i=0;i<REPS;i++){
|
|
char buf;
|
|
int size=sizeof(buf);
|
|
rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
|
|
if(rc_!=ZOK){
|
|
break;
|
|
}
|
|
}
|
|
//TEST_TRACE("Finished %d iterations",i);
|
|
}
|
|
virtual void validate(const char* file, int line) const{
|
|
CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
|
|
}
|
|
zhandle_t* zh_;
|
|
int rc_;
|
|
};
|
|
|
|
// this test connects to a real ZK server and creates the /xyz node and sends
|
|
// lots of zoo_get requests.
|
|
// to run this test use the following command:
|
|
// zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
|
|
// where the second parameter is the server host and port
|
|
void testOperationsAndDisconnectConcurrently2()
|
|
{
|
|
if(globalTestConfig.getTestName().find(__func__)==string::npos ||
|
|
globalTestConfig.getExtraOptCount()==0)
|
|
{
|
|
// only run this test when specifically asked so
|
|
return;
|
|
}
|
|
string host(*(globalTestConfig.getExtraOptBegin()));
|
|
zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
|
|
CPPUNIT_ASSERT(lzh!=0);
|
|
// make sure the client has connected
|
|
CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
|
|
ensureCondition(ClientConnected(zh),5000)<5000);
|
|
|
|
char realpath[1024];
|
|
int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
|
|
CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
|
|
zookeeper_close(lzh);
|
|
|
|
for(int counter=0; counter<200; counter++){
|
|
TEST_TRACE("Loop count %d",counter);
|
|
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// make sure the client has connected
|
|
CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
|
|
ensureCondition(ClientConnected(zh),5000)<5000);
|
|
|
|
TestJobManager jmgr(ZKGetJob(zh),10);
|
|
jmgr.startJobsImmediately();
|
|
jmgr.wait();
|
|
VALIDATE_JOBS(jmgr);
|
|
TEST_TRACE("run %d finished",counter);
|
|
}
|
|
|
|
}
|
|
|
|
class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
|
|
public:
|
|
static const int REPS=1000;
|
|
TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
|
|
TestGetDataJob(svr,zh,REPS){}
|
|
virtual TestJob* clone() const {
|
|
return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
|
|
}
|
|
virtual void validate(const char* file, int line) const{
|
|
CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
|
|
}
|
|
};
|
|
|
|
// this test is not 100% accurate in a sense it may not detect all error cases.
|
|
// TODO: I can't think of a test that is 100% accurate and doesn't interfere
|
|
// with the code being tested (in terms of introducing additional
|
|
// implicit synchronization points)
|
|
void testOperationsAndDisconnectConcurrently1()
|
|
{
|
|
for(int counter=0; counter<50; counter++){
|
|
//TEST_TRACE("Loop count %d",counter);
|
|
// frozen time -- no timeouts and no pings
|
|
Mock_gettimeofday timeMock;
|
|
|
|
ZookeeperServer zkServer;
|
|
Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
|
|
// must call zookeeper_close() while all the mocks are in the scope!
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// make sure the client has connected
|
|
CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
|
|
|
|
TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
|
|
jmgr.startJobsImmediately();
|
|
// let everything startup before we shutdown the server
|
|
millisleep(4);
|
|
// reconnect attempts will start failing immediately
|
|
zkServer.setServerDown(0);
|
|
// next recv call will return 0
|
|
zkServer.setConnectionLost();
|
|
jmgr.wait();
|
|
VALIDATE_JOBS(jmgr);
|
|
}
|
|
|
|
}
|
|
// call zoo_aget() in the multithreaded mode
|
|
void testAsyncGetOperation()
|
|
{
|
|
Mock_gettimeofday timeMock;
|
|
|
|
ZookeeperServer zkServer;
|
|
Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
|
|
// must call zookeeper_close() while all the mocks are in the scope!
|
|
CloseFinally guard(&zh);
|
|
|
|
zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// make sure the client has connected
|
|
CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
|
|
|
|
AsyncGetOperationCompletion res1;
|
|
zkServer.addOperationResponse(new ZooGetResponse("1",1));
|
|
int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
|
|
CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
|
|
}
|
|
class ChangeNodeWatcher: public WatcherAction{
|
|
public:
|
|
ChangeNodeWatcher():changed_(false){}
|
|
virtual void onNodeValueChanged(zhandle_t*,const char* path){
|
|
synchronized(mx_);
|
|
changed_=true;
|
|
if(path!=0) path_=path;
|
|
}
|
|
// this predicate checks if CHANGE_EVENT event type was triggered, unlike
|
|
// the isWatcherTriggered() that returns true whenever a watcher is triggered
|
|
// regardless of the event type
|
|
SyncedBoolCondition isNodeChangedTriggered() const{
|
|
return SyncedBoolCondition(changed_,mx_);
|
|
}
|
|
bool changed_;
|
|
string path_;
|
|
};
|
|
|
|
class AsyncWatcherCompletion: public AsyncCompletion{
|
|
public:
|
|
AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
|
|
virtual void statCompl(int rc, const Stat *stat){
|
|
// we received a server response, now enqueue a watcher event
|
|
// to trigger the watcher
|
|
zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
|
|
}
|
|
ZookeeperServer& zkServer_;
|
|
};
|
|
// verify that async watcher is called for znode events (CREATED, DELETED etc.)
|
|
void testAsyncWatcher1(){
|
|
Mock_gettimeofday timeMock;
|
|
|
|
ZookeeperServer zkServer;
|
|
Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
|
|
// must call zookeeper_close() while all the mocks are in the scope!
|
|
CloseFinally guard(&zh);
|
|
|
|
ChangeNodeWatcher action;
|
|
zh=zookeeper_init("localhost:2121",activeWatcher,10000,
|
|
TEST_CLIENT_ID,&action,0);
|
|
CPPUNIT_ASSERT(zh!=0);
|
|
// make sure the client has connected
|
|
CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
|
|
|
|
// set the watcher
|
|
AsyncWatcherCompletion completion(zkServer);
|
|
// prepare a response for the zoo_aexists() request
|
|
zkServer.addOperationResponse(new ZooStatResponse);
|
|
int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
|
|
CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
|
|
|
|
CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
|
|
CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);
|
|
}
|
|
#endif
|
|
};
|
|
|
|
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);
|