I have been developing my code on in a virtual machine on my development box. But the intention was to deploy the apps to the same Linux server MQTT is running on.
To this end I checked all the code into SVN and checked it out on the other machine. After the pain of remembering all the libraries to install again, and discovering the pain of hard coded paths and command line builds of Eclipse C++ projects – I finally got it running – yay!
Issues
After running the app for a little while it would get into a state where I wasn’t publishing any more messages. I added some extra debug logging which indicated that my publishing flag was not getting set back to false. The callback is just a logging a message and setting the variable to false – cant be that. So some changes …
- I changed the code to make sure I only attempted to publish one message at a time – no idea if you can have more than one outstanding at any given time, but to simplify the issues just have one – nope still problems.
- I changed the code again to only attempt publishing messages on the main thread – I was publishing the next message using the callback thread, maybe the library didn’t like this tactic? Nope still problems.
- I changed the code again to use an atomic<bool> thinking inter-thread issues – nope, still problems.
<hair pulling> – Arrg – It can’t be that hard – the code is really simple!
Possible Problem
The problem appears to thread related, and in particular the order in which I set the publishing flag in the main thread, a slight change and all seems to work again.
The code below shows the line I moved.
You can see that, as it was, if the callback function was called before the main thread had a chance to set the publishing flag the sequence would be
main thread(false)->callback(false)->main thread(true)
instead of my intended sequence
main thread(false)->main thread(true)->callback(false).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
bool MqttDataProvider::HandleEvent(AppEvent* pEvent) { switch (pEvent->Type()) { case APP_EVENT_TIMECHECK: if ((mbConnected == true) && (mbPublishing.load() == false)) { PublishNext(); } break; } return true; } void MqttDataProvider::PublishNext() { MqttEvent mqttEvent; while (mPublishes.TryPop(mqttEvent)) { mPublishTopic = mqttEvent.Topic(); mPublishContent = mqttEvent.Content(); mPublishMessage.payload = (void*)mPublishContent.c_str(); mPublishMessage.payloadlen = mPublishContent.length(); mbPublishing.store(true); // NOTE: New location int rc; if ((rc = MQTTAsync_sendMessage(mClient, mPublishTopic.c_str(), &mPublishMessage, &mPublishOptions)) != MQTTASYNC_SUCCESS) { ERROR("MqttDataProvider Failed to initiate publish [" << rc << "]"); mbPublishing.store(false); } else { INFO("MqttDataProvider Initiated publish"); //mbPublishing.store(true); // NOTE: Old location return; } } } void MqttDataProvider::OnPublishSuccess(void* context, MQTTAsync_successData* response) { INFO("MqttDataProvider Publish success [" << response->token << "]"); mbPublishing.store(false); } void MqttDataProvider::OnPublishFailure(void* context, MQTTAsync_failureData* response) { INFO("MqttDataProvider Publish failure [" << (int)(response ? response->code : 0) << "]"); mbPublishing.store(false); } |